You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by xi...@apache.org on 2023/02/23 01:35:11 UTC
[incubator-uniffle] branch master updated: [#647] fix: Multiple coordinator produce conflicts when they delect the same file (#648)
This is an automated email from the ASF dual-hosted git repository.
xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new d1a3bd14 [#647] fix: Multiple coordinator produce conflicts when they delect the same file (#648)
d1a3bd14 is described below
commit d1a3bd14b3428385bc03fac79118afb85fa2be3f
Author: roryqi <ro...@apache.org>
AuthorDate: Thu Feb 23 09:35:06 2023 +0800
[#647] fix: Multiple coordinator produce conflicts when they delect the same file (#648)
### What changes were proposed in this pull request?
Add coordinator's id to the file which Coordinator delect.
### Why are the changes needed?
Fix #647
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
---
.../java/org/apache/uniffle/coordinator/CoordinatorServer.java | 3 +++
.../strategy/storage/AbstractSelectStorageStrategy.java | 8 ++++++++
.../strategy/storage/AppBalanceSelectStorageStrategy.java | 2 +-
.../strategy/storage/LowestIOSampleCostSelectStorageStrategy.java | 2 +-
.../org/apache/uniffle/coordinator/util/CoordinatorUtils.java | 2 ++
.../strategy/storage/AppBalanceSelectStorageStrategyTest.java | 4 ++++
.../storage/LowestIOSampleCostSelectStorageStrategyTest.java | 4 ++++
7 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index 671055e7..a946062a 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -40,6 +40,7 @@ import org.apache.uniffle.coordinator.metric.CoordinatorGrpcMetrics;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategy;
import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE;
import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE;
@@ -161,6 +162,8 @@ public class CoordinatorServer extends ReconfigurableBase {
}
SecurityContextFactory.get().init(securityConfig);
+ coordinatorConf.setString(CoordinatorUtils.COORDINATOR_ID, id);
+
// load default hadoop configuration
Configuration hadoopConf = new Configuration();
ClusterManagerFactory clusterManagerFactory = new ClusterManagerFactory(coordinatorConf, hadoopConf);
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
index a6564ce2..b4f7c699 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.coordinator.strategy.storage;
import java.io.IOException;
import java.util.Map;
+import java.util.UUID;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -28,6 +29,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
/**
* This is a simple implementation class, which provides some methods to check whether the path is normal
@@ -38,12 +40,14 @@ public abstract class AbstractSelectStorageStrategy implements SelectStorageStra
*/
protected final Map<String, RankValue> remoteStoragePathRankValue;
protected final int fileSize;
+ private final String coordinatorId;
public AbstractSelectStorageStrategy(
Map<String, RankValue> remoteStoragePathRankValue,
CoordinatorConf conf) {
this.remoteStoragePathRankValue = remoteStoragePathRankValue;
fileSize = conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_FILE_SIZE);
+ this.coordinatorId = conf.getString(CoordinatorUtils.COORDINATOR_ID, UUID.randomUUID().toString());
}
public void readAndWriteHdfsStorage(FileSystem fs, Path testPath,
@@ -71,4 +75,8 @@ public abstract class AbstractSelectStorageStrategy implements SelectStorageStra
} while (readBytes != -1);
}
}
+
+ String getCoordinatorId() {
+ return coordinatorId;
+ }
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
index 94e421cf..d2932256 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
@@ -89,7 +89,7 @@ public class AppBalanceSelectStorageStrategy extends AbstractSelectStorageStrate
RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey());
rankValue.setHealthy(new AtomicBoolean(true));
Path remotePath = new Path(uri.getKey());
- String rssTest = uri.getKey() + "/rssTest";
+ String rssTest = uri.getKey() + "/rssTest-" + getCoordinatorId();
Path testPath = new Path(rssTest);
try {
FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath, hdfsConf);
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
index f3d9d61f..7414e4b2 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
@@ -102,7 +102,7 @@ public class LowestIOSampleCostSelectStorageStrategy extends AbstractSelectStora
for (Map.Entry<String, RankValue> uri : uris) {
if (uri.getKey().startsWith(ApplicationManager.getPathSchema().get(0))) {
Path remotePath = new Path(uri.getKey());
- String rssTest = uri.getKey() + "/rssTest";
+ String rssTest = uri.getKey() + "/rssTest-" + getCoordinatorId();
Path testPath = new Path(rssTest);
RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey());
rankValue.setHealthy(new AtomicBoolean(true));
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
index c1247e30..ce18e805 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/util/CoordinatorUtils.java
@@ -38,6 +38,8 @@ public class CoordinatorUtils {
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorUtils.class);
+ public static final String COORDINATOR_ID = "coordinator.id";
+
public static GetShuffleAssignmentsResponse toGetShuffleAssignmentsResponse(
PartitionRangeAssignment pra) {
List<RssProtos.PartitionRangeAssignment> praList = pra.convertToGrpcProto();
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
index d5a8ac1d..e30ef2e6 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategyTest.java
@@ -30,6 +30,7 @@ import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.APP_BALANCE;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -60,6 +61,7 @@ public class AppBalanceSelectStorageStrategyTest {
conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime);
conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, APP_BALANCE);
conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME, 1000);
+ conf.setString(CoordinatorUtils.COORDINATOR_ID, "TESTXXX");
applicationManager = new ApplicationManager(conf);
// to ensure that the reading and writing of hdfs can be controlled
applicationManager.closeDetectStorageScheduler();
@@ -174,5 +176,7 @@ public class AppBalanceSelectStorageStrategyTest {
assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size());
assertEquals(0, applicationManager.getRemoteStoragePathRankValue().size());
assertFalse(applicationManager.hasErrorInStatusCheck());
+ assertEquals("TESTXXX",
+ ((AppBalanceSelectStorageStrategy)applicationManager.getSelectStorageStrategy()).getCoordinatorId());
}
}
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
index 5dae0b4c..0a383b1c 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategyTest.java
@@ -30,6 +30,7 @@ import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
+import org.apache.uniffle.coordinator.util.CoordinatorUtils;
import static org.apache.uniffle.coordinator.ApplicationManager.StrategyName.IO_SAMPLE;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -63,6 +64,7 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
conf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, appExpiredTime);
conf.setLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME, 1000);
conf.set(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY, IO_SAMPLE);
+ conf.setString(CoordinatorUtils.COORDINATOR_ID, "TESTXXX");
applicationManager = new ApplicationManager(conf);
selectStorageStrategy = (LowestIOSampleCostSelectStorageStrategy) applicationManager.getSelectStorageStrategy();
// to ensure that the reading and writing of hdfs can be controlled
@@ -191,5 +193,7 @@ public class LowestIOSampleCostSelectStorageStrategyTest {
assertEquals(0, applicationManager.getAvailableRemoteStorageInfo().size());
assertEquals(0, applicationManager.getRemoteStoragePathRankValue().size());
assertFalse(applicationManager.hasErrorInStatusCheck());
+ assertEquals("TESTXXX",
+ ((LowestIOSampleCostSelectStorageStrategy)applicationManager.getSelectStorageStrategy()).getCoordinatorId());
}
}