You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by xi...@apache.org on 2023/02/23 01:35:11 UTC

[incubator-uniffle] branch master updated: [#647] fix: Multiple coordinator produce conflicts when they delect the same file (#648)

This is an automated email from the ASF dual-hosted git repository.

xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new d1a3bd14 [#647] fix: Multiple coordinator produce conflicts when they delect the same file (#648)
d1a3bd14 is described below

commit d1a3bd14b3428385bc03fac79118afb85fa2be3f
Author: roryqi <ro...@apache.org>
AuthorDate: Thu Feb 23 09:35:06 2023 +0800

    [#647] fix: Multiple coordinator produce conflicts when they delect the same file (#648)
    
    ### What changes were proposed in this pull request?
    Add coordinator's id to the file which Coordinator delect.
    
    ### Why are the changes needed?
    Fix #647
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT
---
 .../java/org/apache/uniffle/coordinator/CoordinatorServer.java    | 3 +++
 .../strategy/storage/AbstractSelectStorageStrategy.java           | 8 ++++++++
 .../strategy/storage/AppBalanceSelectStorageStrategy.java         | 2 +-
 .../strategy/storage/LowestIOSampleCostSelectStorageStrategy.java | 2 +-
 .../org/apache/uniffle/coordinator/util/CoordinatorUtils.java     | 2 ++
 .../strategy/storage/AppBalanceSelectStorageStrategyTest.java     | 4 ++++
 .../storage/LowestIOSampleCostSelectStorageStrategyTest.java      | 4 ++++
 7 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 671055e7..a946062a 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -40,6 +40,7 @@ import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
 import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
 import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategy;
 import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
 
 import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE;
 import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE;
@@ -161,6 +162,8 @@ public class CoordinatorServer extends ReconfigurableBase {
     }
     SecurityContextFactory.get().init(securityConfig);
 
+    coordinatorConf.setString(CoordinatorUtils.COORDINATOR_ID, id);
+
     // load default hadoop configuration
     Configuration hadoopConf = new Configuration();
     ClusterManagerFactory clusterManagerFactory = new ClusterManagerFactory(coordinatorConf, hadoopConf);
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
index a6564ce2..b4f7c699 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.coordinator.strategy.storage;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -28,6 +29,7 @@ import org.apache.hadoop.fs.Path;
 
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
 
 /**
  * This is a simple implementation class, which provides some methods to check whether the path is normal
@@ -38,12 +40,14 @@ public abstract class AbstractSelectStorageStrategy implements SelectStorageStra
    */
   protected final Map<String, RankValue> remoteStoragePathRankValue;
   protected final int fileSize;
+  private final String coordinatorId;
 
   public AbstractSelectStorageStrategy(
       Map<String, RankValue> remoteStoragePathRankValue,
       CoordinatorConf conf) {
     this.remoteStoragePathRankValue = remoteStoragePathRankValue;
     fileSize = conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_FILE_SIZE);
+    this.coordinatorId = conf.getString(CoordinatorUtils.COORDINATOR_ID, UUID.randomUUID().toString());
   }
 
   public void readAndWriteHdfsStorage(FileSystem fs, Path testPath,
@@ -71,4 +75,8 @@ public abstract class AbstractSelectStorageStrategy implements SelectStorageStra
       } while (readBytes != -1);
     }
   }
+
+  String getCoordinatorId() {
+    return coordinatorId;
+  }
 }
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
index 94e421cf..d2932256 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
@@ -89,7 +89,7 @@ public class AppBalanceSelectStorageStrategy extends AbstractSelectStorageStrate
           RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey());
           rankValue.setHealthy(new AtomicBoolean(true));
           Path remotePath = new Path(uri.getKey());
-          String rssTest = uri.getKey() + "/rssTest";
+          String rssTest = uri.getKey() + "/rssTest-" + getCoordinatorId();
           Path testPath = new Path(rssTest);
           try {
             FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath, hdfsConf);
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
index f3d9d61f..7414e4b2 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
@@ -102,7 +102,7 @@ public class LowestIOSampleCostSelectStorageStrategy extends AbstractSelectStora
       for (Map.Entry<String, RankValue> uri : uris) {
         if (uri.getKey().startsWith(ApplicationManager.getPathSchema().get(0))) {
           Path remotePath = new Path(uri.getKey());
-          String rssTest = uri.getKey() + "/rssTest";
+          String rssTest = uri.getKey() + "/rssTest-" + getCoordinatorId();
           Path testPath = new Path(rssTest);
           RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey());
           rankValue.setHealthy(new AtomicBoolean(true));
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
index c1247e30..ce18e805 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
@@ -38,6 +38,8 @@ public class CoordinatorUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(CoordinatorUtils.class);
 
+  public static final String COORDINATOR_ID = "coordinator.id";
+
   public static GetShuffleAssignmentsResponse toGetShuffleAssignmentsResponse(
       PartitionRangeAssignment pra) {
     List<RssProtos.PartitionRangeAssignment> praList = pra.convertToGrpcProto();
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
index d5a8ac1d..e30ef2e6 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
@@ -30,6 +30,7 @@ import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.coordinator.ApplicationManager;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
 
 import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.APP_BALANCE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -60,6 +61,7 @@ public class AppBalanceSelectStorageStrategyTest {
     conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime);
     conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, APP_BALANCE);
     conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME, 1000);
+    conf.setString(CoordinatorUtils.COORDINATOR_ID, "TESTXXX");
     applicationManager = new ApplicationManager(conf);
     // to ensure that the reading and writing of hdfs can be controlled
     applicationManager.closeDetectStorageScheduler();
@@ -174,5 +176,7 @@ public class AppBalanceSelectStorageStrategyTest {
     assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size());
     assertEquals(0, applicationManager.getRemoteStoragePathRankValue().size());
     assertFalse(applicationManager.hasErrorInStatusCheck());
+    assertEquals("TESTXXX",
+        ((AppBalanceSelectStorageStrategy)applicationManager.getSelectStorageStrategy()).getCoordinatorId());
   }
 }
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
index 5dae0b4c..0a383b1c 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
@@ -30,6 +30,7 @@ import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.coordinator.ApplicationManager;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
 
 import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -63,6 +64,7 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
     conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime);
     conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME, 1000);
     conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, IO_SAMPLE);
+    conf.setString(CoordinatorUtils.COORDINATOR_ID, "TESTXXX");
     applicationManager = new ApplicationManager(conf);
     selectStorageStrategy = (LowestIOSampleCostSelectStorageStrategy) applicationManager.getSelectStorageStrategy();
     // to ensure that the reading and writing of hdfs can be controlled
@@ -191,5 +193,7 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
     assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size());
     assertEquals(0, applicationManager.getRemoteStoragePathRankValue().size());
     assertFalse(applicationManager.hasErrorInStatusCheck());
+    assertEquals("TESTXXX",
+        ((LowestIOSampleCostSelectStorageStrategy)applicationManager.getSelectStorageStrategy()).getCoordinatorId());
   }
 }