You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/05 11:25:49 UTC
[incubator-uniffle] branch master updated: [MINOR][IMPROVEMENT]Make the conf of rss.storage.basePath as list (#130)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new c836714 [MINOR][IMPROVEMENT]Make the conf of rss.storage.basePath as list (#130)
c836714 is described below
commit c8367140713cca15c71a04b2b17caefe464e500b
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Fri Aug 5 19:25:44 2022 +0800
[MINOR][IMPROVEMENT]Make the conf of rss.storage.basePath as list (#130)
### What changes were proposed in this pull request?
Make the conf of rss.storage.basePath as list
### Why are the changes needed?
Follow up the PR #9
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No need
---
.../apache/uniffle/common/config/RssBaseConf.java | 3 ++-
.../test/AssignmentServerNodesNumberTest.java | 21 ++++++++++--------
.../uniffle/test/DiskErrorToleranceTest.java | 7 ++++--
.../test/HealthCheckCoordinatorGrpcTest.java | 25 +++++++++++-----------
.../test/MultiStorageFaultToleranceTest.java | 4 ++--
.../apache/uniffle/test/ShuffleServerGrpcTest.java | 3 ++-
.../test/ShuffleServerWithMemLocalHdfsTest.java | 3 ++-
.../uniffle/test/ShuffleServerWithMemoryTest.java | 3 ++-
.../RepartitionWithHdfsMultiStorageRssTest.java | 3 ++-
.../RepartitionWithMemoryMultiStorageRssTest.java | 3 ++-
.../uniffle/test/RepartitionWithMemoryRssTest.java | 3 ++-
.../test/SparkSQLWithDelegationShuffleManager.java | 3 ++-
...arkSQLWithDelegationShuffleManagerFallback.java | 3 ++-
.../apache/uniffle/server/LocalStorageChecker.java | 6 +++---
.../apache/uniffle/server/ShuffleFlushManager.java | 6 +++---
.../server/storage/LocalStorageManager.java | 20 ++++++++---------
.../org/apache/uniffle/server/HealthCheckTest.java | 7 +++---
.../uniffle/server/ShuffleFlushManagerTest.java | 9 +++++---
.../uniffle/server/ShuffleServerConfTest.java | 10 ++++-----
.../uniffle/server/ShuffleServerMetricsTest.java | 3 ++-
.../apache/uniffle/server/ShuffleServerTest.java | 7 ++++--
.../uniffle/server/ShuffleTaskManagerTest.java | 11 +++++-----
.../apache/uniffle/server/StorageCheckerTest.java | 3 ++-
.../server/buffer/ShuffleBufferManagerTest.java | 16 ++++++++------
.../server/storage/HdfsStorageManagerTest.java | 13 +++++------
.../server/storage/LocalStorageManagerTest.java | 23 ++++++++++----------
.../server/storage/MultiStorageManagerTest.java | 7 +++---
27 files changed, 128 insertions(+), 97 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 09df89d..496e6ef 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -139,9 +139,10 @@ public class RssBaseConf extends RssConf {
.defaultValue(1)
.withDescription("Data replica in storage");
- public static final ConfigOption<String> RSS_STORAGE_BASE_PATH = ConfigOptions
+ public static final ConfigOption<List<String>> RSS_STORAGE_BASE_PATH = ConfigOptions
.key("rss.storage.basePath")
.stringType()
+ .asList()
.noDefaultValue()
.withDescription("Common storage path for remote shuffle data");
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentServerNodesNumberTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentServerNodesNumberTest.java
index 57bf341..29ff97a 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentServerNodesNumberTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentServerNodesNumberTest.java
@@ -17,13 +17,18 @@
package org.apache.uniffle.test;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import com.google.common.collect.Sets;
-import com.google.common.io.Files;
import java.io.File;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.client.util.ClientType;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
@@ -31,10 +36,8 @@ import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class AssignmentServerNodesNumberTest extends CoordinatorTestBase {
private static final Logger LOG = LoggerFactory.getLogger(AssignmentServerNodesNumberTest.class);
@@ -55,7 +58,7 @@ public class AssignmentServerNodesNumberTest extends CoordinatorTestBase {
File dataDir1 = new File(tmpDir, "data1");
String basePath = dataDir1.getAbsolutePath();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
- shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, basePath);
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.set(RssBaseConf.RPC_METRICS_ENABLED, true);
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
shuffleServerConf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 5000L);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
index 49ae26c..31a9f90 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/DiskErrorToleranceTest.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.test;
import java.io.File;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -68,8 +69,10 @@ public class DiskErrorToleranceTest extends ShuffleReadWriteBase {
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
- shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
- data1.getAbsolutePath() + "," + data2.getAbsolutePath());
+ shuffleServerConf.set(
+ ShuffleServerConf.RSS_STORAGE_BASE_PATH,
+ Arrays.asList(data1.getAbsolutePath(), data2.getAbsolutePath())
+ );
shuffleServerConf.setBoolean(ShuffleServerConf.HEALTH_CHECK_ENABLE, true);
createShuffleServer(shuffleServerConf);
startServers();
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
index 54bac8e..22c92ab 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/HealthCheckCoordinatorGrpcTest.java
@@ -17,8 +17,19 @@
package org.apache.uniffle.test;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
import com.google.common.collect.Sets;
+import com.google.common.io.Files;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
import org.apache.uniffle.client.request.RssGetShuffleAssignmentsRequest;
import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssGetShuffleAssignmentsResponse;
@@ -28,16 +39,6 @@ import org.apache.uniffle.coordinator.ServerNode;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
-import com.google.common.io.Files;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -75,7 +76,7 @@ public class HealthCheckCoordinatorGrpcTest extends CoordinatorTestBase {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.setBoolean(ShuffleServerConf.HEALTH_CHECK_ENABLE, true);
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
- shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, data1.getAbsolutePath());
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(data1.getAbsolutePath()));
shuffleServerConf.setDouble(ShuffleServerConf.HEALTH_STORAGE_RECOVERY_USAGE_PERCENTAGE, healthUsage);
shuffleServerConf.setDouble(ShuffleServerConf.HEALTH_STORAGE_MAX_USAGE_PERCENTAGE, maxUsage);
shuffleServerConf.setLong(ShuffleServerConf.HEALTH_CHECK_INTERVAL, 1000L);
@@ -83,7 +84,7 @@ public class HealthCheckCoordinatorGrpcTest extends CoordinatorTestBase {
shuffleServerConf.setInteger(ShuffleServerConf.RPC_SERVER_PORT, SHUFFLE_SERVER_PORT + 1);
shuffleServerConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 18081);
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
- shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, data2.getAbsolutePath());
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(data2.getAbsolutePath()));
shuffleServerConf.setDouble(ShuffleServerConf.HEALTH_STORAGE_RECOVERY_USAGE_PERCENTAGE, healthUsage);
shuffleServerConf.setDouble(ShuffleServerConf.HEALTH_STORAGE_MAX_USAGE_PERCENTAGE, maxUsage);
shuffleServerConf.setLong(ShuffleServerConf.HEALTH_CHECK_INTERVAL, 1000L);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
index 7e83a7b..a0fe6ab 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/MultiStorageFaultToleranceTest.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -26,7 +27,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.uniffle.client.util.DefaultIdHelper;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.BeforeAll;
@@ -68,7 +68,7 @@ public class MultiStorageFaultToleranceTest extends ShuffleReadWriteBase {
shuffleServerConf.setLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 60L * 1000L * 60L);
shuffleServerConf.setLong(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 20L * 1000L);
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE_HDFS.name());
- shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, basePath);
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 400L * 1024L * 1024L);
createAndStartServers(shuffleServerConf, coordinatorConf);
}
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index a1cc6a1..52bcc9c 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.test;
import java.io.File;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -84,7 +85,7 @@ public class ShuffleServerGrpcTest extends IntegrationTestBase {
String basePath = dataDir1.getAbsolutePath();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.set(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, EVENT_THRESHOLD_SIZE);
- shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, basePath);
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.set(RssBaseConf.RPC_METRICS_ENABLED, true);
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
shuffleServerConf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 5000L);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
index 2d02e66..4898af8 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.test;
import java.io.File;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -66,7 +67,7 @@ public class ShuffleServerWithMemLocalHdfsTest extends ShuffleReadWriteBase {
tmpDir.deleteOnExit();
File dataDir = new File(tmpDir, "data");
String basePath = dataDir.getAbsolutePath();
- shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, basePath);
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 450L);
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 5000L);
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
index 3c26312..3532e8e 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.test;
import java.io.File;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -64,7 +65,7 @@ public class ShuffleServerWithMemoryTest extends ShuffleReadWriteBase {
File dataDir = new File(tmpDir, "data");
String basePath = dataDir.getAbsolutePath();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
- shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, basePath);
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 5000L);
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 20.0);
shuffleServerConf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 40.0);
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java
index 4552a2d..4d71ea5 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHdfsMultiStorageRssTest.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.test;
import java.io.File;
+import java.util.Arrays;
import java.util.Map;
import com.google.common.collect.Maps;
@@ -47,7 +48,7 @@ public class RepartitionWithHdfsMultiStorageRssTest extends RepartitionTest {
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
- shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, basePath);
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE_HDFS.name());
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 1024L * 1024L);
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryMultiStorageRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryMultiStorageRssTest.java
index 2cbe5ac..232a31b 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryMultiStorageRssTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryMultiStorageRssTest.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.test;
import java.io.File;
+import java.util.Arrays;
import java.util.Map;
import com.google.common.collect.Maps;
@@ -47,7 +48,7 @@ public class RepartitionWithMemoryMultiStorageRssTest extends RepartitionTest {
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
- shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, basePath);
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 1024L * 1024L);
createShuffleServer(shuffleServerConf);
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java
index f15e142..9061a78 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.test;
import java.io.File;
+import java.util.Arrays;
import java.util.Map;
import com.google.common.collect.Maps;
@@ -50,7 +51,7 @@ public class RepartitionWithMemoryRssTest extends RepartitionTest {
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name());
- shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, basePath);
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.setString(ShuffleServerConf.SERVER_BUFFER_CAPACITY.key(), "512mb");
createShuffleServer(shuffleServerConf);
startServers();
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManager.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManager.java
index 4c6d2aa..7583601 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManager.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManager.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.test;
import java.io.File;
+import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -58,7 +59,7 @@ public class SparkSQLWithDelegationShuffleManager extends SparkSQLTest {
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
- shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, basePath);
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.setString(ShuffleServerConf.SERVER_BUFFER_CAPACITY.key(), "512mb");
createShuffleServer(shuffleServerConf);
startServers();
diff --git a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallback.java b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallback.java
index 3e2cee4..eace92a 100644
--- a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallback.java
+++ b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallback.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.test;
import java.io.File;
+import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -59,7 +60,7 @@ public class SparkSQLWithDelegationShuffleManagerFallback extends SparkSQLTest {
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
- shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, basePath);
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(basePath));
shuffleServerConf.setString(ShuffleServerConf.SERVER_BUFFER_CAPACITY.key(), "512mb");
createShuffleServer(shuffleServerConf);
startServers();
diff --git a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
index 1a54ee2..b24e7e8 100644
--- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
+++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
@@ -25,9 +25,9 @@ import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomUtils;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,8 +46,8 @@ public class LocalStorageChecker extends Checker {
public LocalStorageChecker(ShuffleServerConf conf, List<LocalStorage> storages) {
super(conf);
- String basePathStr = conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH);
- if (StringUtils.isEmpty(basePathStr)) {
+ List<String> basePaths = conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH);
+ if (CollectionUtils.isEmpty(basePaths)) {
throw new IllegalArgumentException("The base path cannot be empty");
}
String storageType = conf.getString(ShuffleServerConf.RSS_STORAGE_TYPE);
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 46f16d3..996018a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -50,7 +50,7 @@ public class ShuffleFlushManager {
private final ShuffleServer shuffleServer;
private final BlockingQueue<ShuffleDataFlushEvent> flushQueue = Queues.newLinkedBlockingQueue();
private final ThreadPoolExecutor threadPoolExecutor;
- private final String[] storageBasePaths;
+ private final List<String> storageBasePaths;
private final String shuffleServerId;
private final String storageType;
private final int storageDataReplica;
@@ -85,7 +85,7 @@ public class ShuffleFlushManager {
long keepAliveTime = shuffleServerConf.getLong(ShuffleServerConf.SERVER_FLUSH_THREAD_ALIVE);
threadPoolExecutor = new ThreadPoolExecutor(poolSize, poolSize, keepAliveTime, TimeUnit.SECONDS, waitQueue,
ThreadUtils.getThreadFactory("FlushEventThreadPool"));
- storageBasePaths = shuffleServerConf.getString(ShuffleServerConf.RSS_STORAGE_BASE_PATH).split(",");
+ storageBasePaths = shuffleServerConf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH);
pendingEventTimeoutSec = shuffleServerConf.getLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC);
// the thread for flush data
Runnable processEventRunnable = () -> {
@@ -164,7 +164,7 @@ public class ShuffleFlushManager {
event.getShuffleId(),
event.getStartPartition(),
event.getEndPartition(),
- storageBasePaths,
+ storageBasePaths.toArray(new String[storageBasePaths.size()]),
shuffleServerId,
hadoopConf,
storageDataReplica));
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 5b05dbe..f0ed715 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -30,6 +30,7 @@ import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
@@ -57,7 +58,7 @@ public class LocalStorageManager extends SingleStorageManager {
private static final Logger LOG = LoggerFactory.getLogger(LocalStorageManager.class);
private final List<LocalStorage> localStorages;
- private final String[] storageBasePaths;
+ private final List<String> storageBasePaths;
private final LocalStorageChecker checker;
private List<LocalStorage> unCorruptedStorages = Lists.newArrayList();
private final Set<String> corruptedStorages = Sets.newConcurrentHashSet();
@@ -65,11 +66,10 @@ public class LocalStorageManager extends SingleStorageManager {
@VisibleForTesting
LocalStorageManager(ShuffleServerConf conf) {
super(conf);
- String storageBasePathStr = conf.getString(ShuffleServerConf.RSS_STORAGE_BASE_PATH);
- if (StringUtils.isEmpty(storageBasePathStr)) {
+ storageBasePaths = conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH);
+ if (CollectionUtils.isEmpty(storageBasePaths)) {
throw new IllegalArgumentException("Base path dirs must not be empty");
}
- storageBasePaths = storageBasePathStr.split(",");
long shuffleExpiredTimeoutMs = conf.get(ShuffleServerConf.SHUFFLE_EXPIRED_TIMEOUT_MS);
long capacity = conf.getSizeAsBytes(ShuffleServerConf.DISK_CAPACITY);
double highWaterMarkOfWrite = conf.get(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE);
@@ -79,13 +79,13 @@ public class LocalStorageManager extends SingleStorageManager {
}
// We must make sure the order of `storageBasePaths` and `localStorages` is same, or some unit test may be fail
- CountDownLatch countDownLatch = new CountDownLatch(storageBasePaths.length);
+ CountDownLatch countDownLatch = new CountDownLatch(storageBasePaths.size());
AtomicInteger successCount = new AtomicInteger();
ExecutorService executorService = Executors.newCachedThreadPool();
- LocalStorage[] localStorageArray = new LocalStorage[storageBasePaths.length];
- for (int i = 0; i < storageBasePaths.length; i++) {
+ LocalStorage[] localStorageArray = new LocalStorage[storageBasePaths.size()];
+ for (int i = 0; i < storageBasePaths.size(); i++) {
final int idx = i;
- String storagePath = storageBasePaths[i];
+ String storagePath = storageBasePaths.get(i);
executorService.submit(() -> {
try {
localStorageArray[idx] = LocalStorage.newBuilder()
@@ -111,7 +111,7 @@ public class LocalStorageManager extends SingleStorageManager {
}
executorService.shutdown();
- int failedCount = storageBasePaths.length - successCount.get();
+ int failedCount = storageBasePaths.size() - successCount.get();
long maxFailedNumber = conf.getLong(LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER);
if (failedCount > maxFailedNumber || successCount.get() == 0) {
throw new RuntimeException(
@@ -182,7 +182,7 @@ public class LocalStorageManager extends SingleStorageManager {
ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(StorageType.LOCALFILE.name(), new Configuration()));
- deleteHandler.delete(storageBasePaths, appId);
+ deleteHandler.delete(storageBasePaths.toArray(new String[storageBasePaths.size()]), appId);
}
@Override
diff --git a/server/src/test/java/org/apache/uniffle/server/HealthCheckTest.java b/server/src/test/java/org/apache/uniffle/server/HealthCheckTest.java
index 4517c7b..ace60f4 100644
--- a/server/src/test/java/org/apache/uniffle/server/HealthCheckTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/HealthCheckTest.java
@@ -17,11 +17,12 @@
package org.apache.uniffle.server;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import com.google.common.collect.Lists;
import org.junit.jupiter.api.Test;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -36,7 +37,7 @@ public class HealthCheckTest {
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), "");
assertConf(conf);
conf.setString(ShuffleServerConf.HEALTH_CHECKER_CLASS_NAMES.key(), "org.apache.uniffle.server.LocalStorageChecker");
- conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, "s1");
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("s1"));
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
assertConf(conf);
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index d137d5f..7002274 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -19,6 +19,8 @@ package org.apache.uniffle.server;
import java.io.File;
import java.io.FileNotFoundException;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
@@ -30,6 +32,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.prometheus.client.Gauge;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -71,7 +74,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
@BeforeEach
public void prepare() {
ShuffleServerMetrics.register();
- shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, "");
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Collections.emptyList());
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
LogManager.getRootLogger().setLevel(Level.INFO);
}
@@ -236,7 +239,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
String appId1 = "clearLocalTest_appId1";
String appId2 = "clearLocalTest_appId2";
ShuffleServerConf serverConf = new ShuffleServerConf();
- serverConf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, tempDir.getAbsolutePath());
+ serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
serverConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
StorageManager storageManager =
@@ -381,7 +384,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
public void processPendingEventsTest(@TempDir File tempDir) {
try {
shuffleServerConf.set(RssBaseConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.toString());
- shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, tempDir.getAbsolutePath());
+ shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 100L);
shuffleServerConf.set(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC, 5L);
StorageManager storageManager =
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
index 4c49493..c508958 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java
@@ -17,18 +17,18 @@
package org.apache.uniffle.server;
-import org.apache.uniffle.common.util.ByteUnit;
+import java.io.File;
+
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.jupiter.SystemStub;
import uk.org.webcompere.systemstubs.jupiter.SystemStubsExtension;
-import java.io.File;
+import org.apache.uniffle.common.util.ByteUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(SystemStubsExtension.class)
public class ShuffleServerConfTest {
@@ -51,7 +51,7 @@ public class ShuffleServerConfTest {
ShuffleServerConf shuffleServerConf = new ShuffleServerConf(null);
assertEquals(1234, shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
assertEquals("HDFS", shuffleServerConf.getString(ShuffleServerConf.RSS_STORAGE_TYPE));
- assertEquals("/var/tmp/test", shuffleServerConf.getString(ShuffleServerConf.RSS_STORAGE_BASE_PATH));
+ assertEquals("/var/tmp/test", shuffleServerConf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH).get(0));
environmentVariables.set("RSS_HOME", (new File(confFile)).getParent() + "/wrong_dir/");
assertFalse(shuffleServerConf.loadConfFromFile(null));
}
@@ -61,7 +61,7 @@ public class ShuffleServerConfTest {
ShuffleServerConf shuffleServerConf = new ShuffleServerConf(confFile);
assertEquals(1234, shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
assertEquals("FILE", shuffleServerConf.getString(ShuffleServerConf.RSS_STORAGE_TYPE));
- assertEquals("/var/tmp/test", shuffleServerConf.getString(ShuffleServerConf.RSS_STORAGE_BASE_PATH));
+ assertEquals("/var/tmp/test", shuffleServerConf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH).get(0));
assertFalse(shuffleServerConf.loadConfFromFile("/var/tmp/null"));
assertEquals(2, shuffleServerConf.getLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY));
assertEquals("value1", shuffleServerConf.getString("rss.server.hadoop.a.b", ""));
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index 1b0dd14..1aee302 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.server;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@@ -54,7 +55,7 @@ public class ShuffleServerMetricsTest {
ssc.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
ssc.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 128);
ssc.set(ShuffleServerConf.RPC_SERVER_PORT, 12346);
- ssc.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, "tmp");
+ ssc.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("tmp"));
ssc.set(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
ssc.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
ssc.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "fake.coordinator:123");
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
index b3def5f..bc0639a 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
@@ -17,10 +17,13 @@
package org.apache.uniffle.server;
+import java.util.Arrays;
+
+import org.junit.jupiter.api.Test;
+
import org.apache.uniffle.common.util.ExitUtils;
import org.apache.uniffle.common.util.ExitUtils.ExitException;
import org.apache.uniffle.storage.util.StorageType;
-import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
@@ -35,7 +38,7 @@ public class ShuffleServerTest {
serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
serverConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 9528);
serverConf.setString(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:0");
- serverConf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, "/tmp/null");
+ serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/tmp/null"));
serverConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
serverConf.setLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 100);
serverConf.setLong(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY, 10);
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 4a3d5b2..46c762f 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.server;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -26,22 +27,22 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.collect.RangeMap;
import com.google.common.collect.Sets;
-import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.hadoop.conf.Configuration;
-import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
-import org.apache.uniffle.server.buffer.ShuffleBuffer;
-import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
+import org.apache.uniffle.server.buffer.ShuffleBuffer;
+import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.storage.HdfsTestBase;
import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler;
@@ -234,7 +235,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
- conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, storageBasePath);
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
diff --git a/server/src/test/java/org/apache/uniffle/server/StorageCheckerTest.java b/server/src/test/java/org/apache/uniffle/server/StorageCheckerTest.java
index 0431040..5338977 100644
--- a/server/src/test/java/org/apache/uniffle/server/StorageCheckerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/StorageCheckerTest.java
@@ -23,6 +23,7 @@ import org.apache.uniffle.storage.util.StorageType;
import org.junit.jupiter.api.Test;
import java.io.File;
+import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -37,7 +38,7 @@ public class StorageCheckerTest {
ShuffleServerConf conf = new ShuffleServerConf();
conf.setBoolean(ShuffleServerConf.HEALTH_CHECK_ENABLE, true);
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
- conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, "st1,st2,st3");
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("st1", "st2", "st3"));
conf.set(ShuffleServerConf.HEALTH_MIN_STORAGE_PERCENTAGE, 55.0);
List<LocalStorage> storages = Lists.newArrayList();
storages.add(LocalStorage.newBuilder().basePath("st1").build());
diff --git a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index ecd4d43..a7a0078 100644
--- a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -17,8 +17,16 @@
package org.apache.uniffle.server.buffer;
+import java.io.File;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
import com.google.common.collect.RangeMap;
import com.google.common.io.Files;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedData;
import org.apache.uniffle.common.util.Constants;
@@ -30,12 +38,6 @@ import org.apache.uniffle.server.StatusCode;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.server.storage.StorageManagerFactory;
import org.apache.uniffle.storage.util.StorageType;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-
-import java.io.File;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -59,7 +61,7 @@ public class ShuffleBufferManagerTest extends BufferTestBase {
File tmpDir = Files.createTempDir();
File dataDir = new File(tmpDir, "data");
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
- conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, dataDir.getAbsolutePath());
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(dataDir.getAbsolutePath()));
conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 500L);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 20.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 80.0);
diff --git a/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java
index edb7762..1ad7967 100644
--- a/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java
@@ -17,20 +17,21 @@
package org.apache.uniffle.server.storage;
+import java.util.Arrays;
import java.util.Map;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.server.ShuffleServerConf;
+import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.storage.common.HdfsStorage;
import org.apache.uniffle.storage.util.StorageType;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -52,7 +53,7 @@ public class HdfsStorageManagerTest {
public void testRegisterRemoteStorage() {
ShuffleServerConf conf = new ShuffleServerConf();
conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 2000L);
- conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, "test");
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
HdfsStorageManager hdfsStorageManager = new HdfsStorageManager(conf);
diff --git a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
index ee7d412..b1231f0 100644
--- a/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/storage/LocalStorageManagerTest.java
@@ -18,20 +18,21 @@
package org.apache.uniffle.server.storage;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.util.StorageType;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
/**
@@ -54,7 +55,7 @@ public class LocalStorageManagerTest {
String[] storagePaths = {"/tmp/rssdata", "/tmp/rssdata2"};
ShuffleServerConf conf = new ShuffleServerConf();
- conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, String.join(",", storagePaths));
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storagePaths));
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.LOCALFILE.name());
LocalStorageManager localStorageManager = new LocalStorageManager(conf);
@@ -73,7 +74,7 @@ public class LocalStorageManagerTest {
// case1: when no candidates, it should throw exception.
ShuffleServerConf conf = new ShuffleServerConf();
conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 2000L);
- conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, "/a/rss-data,/b/rss-data");
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/a/rss-data", "/b/rss-data"));
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L);
conf.setLong(ShuffleServerConf.LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER, 1);
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
@@ -85,17 +86,17 @@ public class LocalStorageManagerTest {
}
// case2: when candidates exist, it should initialize successfully.
- conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, "/a/rss-data,/tmp/rss-data-1");
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/a/rss-data", "/tmp/rss-data-1"));
LocalStorageManager localStorageManager = new LocalStorageManager(conf);
assertEquals(1, localStorageManager.getStorages().size());
// case3: all ok
- conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, "/tmp/rss-data-1,/tmp/rss-data-2");
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/tmp/rss-data-1", "/tmp/rss-data-2"));
localStorageManager = new LocalStorageManager(conf);
assertEquals(2, localStorageManager.getStorages().size());
// case4: only have 1 candidates, but exceed the number of rss.server.localstorage.initialize.max.fail.number
- conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, "/a/rss-data,/tmp/rss-data-1");
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/a/rss-data", "/tmp/rss-data-1"));
conf.setLong(ShuffleServerConf.LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER, 0L);
try {
localStorageManager = new LocalStorageManager(conf);
@@ -105,7 +106,7 @@ public class LocalStorageManagerTest {
}
// case5: if failed=2, but lower than rss.server.localstorage.initialize.max.fail.number, should exit
- conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, "/a/rss-data,/b/rss-data-1");
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/a/rss-data", "/b/rss-data-1"));
conf.setLong(ShuffleServerConf.LOCAL_STORAGE_INITIALIZE_MAX_FAIL_NUMBER, 10L);
try {
localStorageManager = new LocalStorageManager(conf);
diff --git a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
index 9a944e4..3c65159 100644
--- a/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/storage/MultiStorageManagerTest.java
@@ -17,15 +17,16 @@
package org.apache.uniffle.server.storage;
+import java.util.Arrays;
import java.util.List;
import com.google.common.collect.Lists;
-import org.apache.uniffle.server.ShuffleDataFlushEvent;
-import org.apache.uniffle.server.ShuffleServerConf;
import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.server.ShuffleDataFlushEvent;
+import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.common.HdfsStorage;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.util.StorageType;
@@ -40,7 +41,7 @@ public class MultiStorageManagerTest {
String appId = "selectStorageManagerTest_appId";
ShuffleServerConf conf = new ShuffleServerConf();
conf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 2000L);
- conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH, "test");
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList("test"));
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE_HDFS.name());
MultiStorageManager manager = new MultiStorageManager(conf);