You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/02/10 06:30:44 UTC
[incubator-uniffle] branch master updated: [#410] feat: support the hot reload of coordinator's configuration (#572)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ab17e778 [#410] feat: support the hot reload of coordinator's configuration (#572)
ab17e778 is described below
commit ab17e778a3231cbe55b6d11ed90167d82bb20ad6
Author: roryqi <ro...@apache.org>
AuthorDate: Fri Feb 10 14:30:39 2023 +0800
[#410] feat: support the hot reload of coordinator's configuration (#572)
### What changes were proposed in this pull request?
I refer to Hadoop implement. I design a hot reload process. We support nodeMax at first.
### Why are the changes needed?
It's more convenient.
### Does this PR introduce _any_ user-facing change?
add `rss.reconfigure.interval.sec`.
### How was this patch tested?
UT
Co-authored-by: roryqi <ro...@tencent.com>
Co-authored-by: Kaijie Chen <ck...@apache.org>
---
.../uniffle/common/config/Reconfigurable.java | 46 ++++--------
.../uniffle/common/config/ReconfigurableBase.java | 82 ++++++++++++++++++++++
.../apache/uniffle/common/config/RssBaseConf.java | 7 ++
.../apache/uniffle/coordinator/AccessManager.java | 22 +++++-
.../apache/uniffle/coordinator/ClusterManager.java | 4 +-
.../uniffle/coordinator/CoordinatorServer.java | 31 +++++++-
.../uniffle/coordinator/SimpleClusterManager.java | 17 ++++-
.../access/checker/AccessClusterLoadChecker.java | 20 +++++-
docs/coordinator_guide.md | 1 +
.../uniffle/test/CoordinatorAssignmentTest.java | 34 +++++++++
10 files changed, 225 insertions(+), 39 deletions(-)
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java b/common/src/main/java/org/apache/uniffle/common/config/Reconfigurable.java
similarity index 51%
copy from coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
copy to common/src/main/java/org/apache/uniffle/common/config/Reconfigurable.java
index 8db4abc5..15f7cf35 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/Reconfigurable.java
@@ -15,43 +15,23 @@
* limitations under the License.
*/
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.common.config;
-import java.io.Closeable;
-import java.util.List;
-import java.util.Set;
-
-public interface ClusterManager extends Closeable {
-
- /**
- * Add a server to the cluster.
- *
- * @param shuffleServerInfo server info
- */
- void add(ServerNode shuffleServerInfo);
+public interface Reconfigurable {
/**
- * Get available nodes from the cluster
- *
- * @param requiredTags tags for filter
- * @return list of available server nodes
- */
- List<ServerNode> getServerList(Set<String> requiredTags);
+ * The method use new configuration to reconfigure the component
+ * @param conf means that new configuration after modification
+ **/
+ void reconfigure(RssConf conf);
+ // todo: Mark the properties is reloadable or not in the ConfigOptionBuilder
/**
- * @return number of server nodes in the cluster
- */
- int getNodesNum();
+ * This method judge whether the property could be reconfigurable or not.
+ * @param property means that property name.
+ * @return True means that the property could be reconfigurable.
+ * False means that the property couldn't be reconfigurable.
+ **/
+ boolean isPropertyReconfigurable(String property);
- /**
- * @return list all server nodes in the cluster
- */
- List<ServerNode> list();
-
- int getShuffleNodesMax();
-
- /**
- * @return whether to be ready for serving
- */
- boolean isReadyForServe();
}
diff --git a/common/src/main/java/org/apache/uniffle/common/config/ReconfigurableBase.java b/common/src/main/java/org/apache/uniffle/common/config/ReconfigurableBase.java
new file mode 100644
index 00000000..7f0fa978
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/config/ReconfigurableBase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.config;
+
+import java.io.File;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.ThreadUtils;
+
+public abstract class ReconfigurableBase implements Reconfigurable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReconfigurableBase.class);
+ public static final String RECONFIGURABLE_FILE_NAME = "reconfigurable.file.name";
+
+ private final RssConf rssConf;
+
+ private final ScheduledExecutorService scheduledExecutorService;
+ private final long checkIntervalSec;
+ private final AtomicLong lastModify = new AtomicLong(0L);
+
+ public ReconfigurableBase(RssConf rssConf) {
+ this.rssConf = rssConf;
+ scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.getThreadFactory("ReconfigurableThread-%d"));
+ checkIntervalSec = rssConf.getLong(RssBaseConf.RSS_RECONFIGURE_INTERVAL_SEC);
+ }
+
+ public void startReconfigureThread() {
+ scheduledExecutorService.scheduleAtFixedRate(
+ this::checkConfiguration, checkIntervalSec, checkIntervalSec, TimeUnit.SECONDS);
+ }
+
+ public void stopReconfigureThread() {
+ scheduledExecutorService.shutdown();
+ }
+
+ private void checkConfiguration() {
+ String fileName = rssConf.getString(RECONFIGURABLE_FILE_NAME, "");
+ if (fileName.isEmpty()) {
+ LOG.warn("Config file name isn't set, we skip checking");
+ return;
+ }
+ File configFile = new File(fileName);
+ if (!configFile.exists()) {
+ LOG.warn("Config file doesn't exist, we skip checking");
+ return;
+ }
+ long newLastModify = configFile.lastModified();
+ if (lastModify.get() == 0) {
+ lastModify.set(newLastModify);
+ return;
+ }
+ if (newLastModify != lastModify.get()) {
+ LOG.warn("Server detect the modification of file {}, we start to reconfigure", fileName);
+ lastModify.set(newLastModify);
+ reconfigure(reloadConfiguration());
+ }
+ }
+
+ protected abstract RssConf reloadConfiguration();
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 1d794fb3..761e5bf3 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -205,6 +205,13 @@ public class RssBaseConf extends RssConf {
.noDefaultValue()
.withDescription("The class of metrics reporter.");
+ public static final ConfigOption<Long> RSS_RECONFIGURE_INTERVAL_SEC = ConfigOptions
+ .key("rss.reconfigure.interval.sec")
+ .longType()
+ .checkValue(ConfigUtils.POSITIVE_LONG_VALIDATOR, "The value must be posite long")
+ .defaultValue(5L)
+ .withDescription("Reconfigure check interval.");
+
public boolean loadCommonConf(Map<String, String> properties) {
if (properties == null) {
return false;
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
index ea5a6b21..8914e87a 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.config.Reconfigurable;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
@@ -33,7 +35,7 @@ import org.apache.uniffle.coordinator.access.AccessInfo;
import org.apache.uniffle.coordinator.access.checker.AccessChecker;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
-public class AccessManager {
+public class AccessManager implements Reconfigurable {
private static final Logger LOG = LoggerFactory.getLogger(AccessManager.class);
@@ -107,4 +109,22 @@ public class AccessManager {
checker.close();
}
}
+
+ public boolean isPropertyReconfigurable(String property) {
+ for (AccessChecker checker : accessCheckers) {
+ if (checker instanceof Reconfigurable
+ && ((Reconfigurable) checker).isPropertyReconfigurable(property)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void reconfigure(RssConf conf) {
+ for (AccessChecker checker : accessCheckers) {
+ if (checker instanceof Reconfigurable) {
+ ((Reconfigurable) checker).reconfigure(conf);
+ }
+ }
+ }
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
index 8db4abc5..5139b10b 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
@@ -21,7 +21,9 @@ import java.io.Closeable;
import java.util.List;
import java.util.Set;
-public interface ClusterManager extends Closeable {
+import org.apache.uniffle.common.config.Reconfigurable;
+
+public interface ClusterManager extends Closeable, Reconfigurable {
/**
* Add a server to the cluster.
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index e1a126be..671055e7 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -24,6 +24,8 @@ import org.slf4j.LoggerFactory;
import picocli.CommandLine;
import org.apache.uniffle.common.Arguments;
+import org.apache.uniffle.common.config.ReconfigurableBase;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.metrics.GRPCMetrics;
import org.apache.uniffle.common.metrics.JvmMetrics;
import org.apache.uniffle.common.metrics.MetricReporter;
@@ -48,7 +50,7 @@ import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_K
/**
* The main entrance of coordinator service
*/
-public class CoordinatorServer {
+public class CoordinatorServer extends ReconfigurableBase {
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorServer.class);
@@ -65,6 +67,7 @@ public class CoordinatorServer {
private String id;
public CoordinatorServer(CoordinatorConf coordinatorConf) throws Exception {
+ super(coordinatorConf);
this.coordinatorConf = coordinatorConf;
try {
initialization();
@@ -84,6 +87,8 @@ public class CoordinatorServer {
// Load configuration from config files
final CoordinatorConf coordinatorConf = new CoordinatorConf(configFile);
+ coordinatorConf.setString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME, configFile);
+
// Start the coordinator service
final CoordinatorServer coordinatorServer = new CoordinatorServer(coordinatorConf);
@@ -92,6 +97,7 @@ public class CoordinatorServer {
}
public void start() throws Exception {
+ startReconfigureThread();
jettyServer.start();
server.start();
@@ -126,6 +132,7 @@ public class CoordinatorServer {
metricReporter.stop();
LOG.info("Metric Reporter Stopped!");
}
+ stopReconfigureThread();
SecurityContextFactory.get().getSecurityContext().close();
server.stop();
}
@@ -241,4 +248,26 @@ public class CoordinatorServer {
private void blockUntilShutdown() throws InterruptedException {
server.blockUntilShutdown();
}
+
+ @Override
+ public void reconfigure(RssConf conf) {
+ clusterManager.reconfigure(conf);
+ accessManager.reconfigure(conf);
+ }
+
+ @Override
+ public boolean isPropertyReconfigurable(String property) {
+ if (clusterManager.isPropertyReconfigurable(property)) {
+ return true;
+ }
+ if (accessManager.isPropertyReconfigurable(property)) {
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public RssConf reloadConfiguration() {
+ return new CoordinatorConf(coordinatorConf.getString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME, ""));
+ }
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index ae1f6d23..dbe5df33 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
@@ -58,7 +59,7 @@ public class SimpleClusterManager implements ClusterManager {
private Map<String, Set<ServerNode>> tagToNodes = Maps.newConcurrentMap();
private AtomicLong excludeLastModify = new AtomicLong(0L);
private long heartbeatTimeout;
- private int shuffleNodesMax;
+ private volatile int shuffleNodesMax;
private ScheduledExecutorService scheduledExecutorService;
private ScheduledExecutorService checkNodesExecutorService;
private FileSystem hadoopFileSystem;
@@ -278,4 +279,18 @@ public class SimpleClusterManager implements ClusterManager {
public void setStartupSilentPeriodEnabled(boolean startupSilentPeriodEnabled) {
this.startupSilentPeriodEnabled = startupSilentPeriodEnabled;
}
+
+ @Override
+ public void reconfigure(RssConf conf) {
+ int nodeMax = conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
+ if (nodeMax != shuffleNodesMax) {
+ LOG.warn("Coordinator update new shuffleNodesMax " + nodeMax);
+ shuffleNodesMax = nodeMax;
+ }
+ }
+
+ @Override
+ public boolean isPropertyReconfigurable(String property) {
+ return CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX.key().equals(property);
+ }
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
index 773e68a8..67eb584d 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
@@ -24,6 +24,8 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.config.Reconfigurable;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.ClusterManager;
@@ -39,7 +41,7 @@ import static org.apache.uniffle.common.util.Constants.ACCESS_INFO_REQUIRED_SHUF
* AccessClusterLoadChecker use the cluster load metrics including memory and healthy to
* filter and count available nodes numbers and reject if the number do not reach the threshold.
*/
-public class AccessClusterLoadChecker extends AbstractAccessChecker {
+public class AccessClusterLoadChecker extends AbstractAccessChecker implements Reconfigurable {
private static final Logger LOG = LoggerFactory.getLogger(AccessClusterLoadChecker.class);
@@ -47,7 +49,7 @@ public class AccessClusterLoadChecker extends AbstractAccessChecker {
private final double memoryPercentThreshold;
// The hard constraint number of available shuffle servers
private final int availableServerNumThreshold;
- private final int defaultRequiredShuffleServerNumber;
+ private volatile int defaultRequiredShuffleServerNumber;
public AccessClusterLoadChecker(AccessManager accessManager) throws Exception {
super(accessManager);
@@ -114,4 +116,18 @@ public class AccessClusterLoadChecker extends AbstractAccessChecker {
public void close() {
}
+
+ @Override
+ public void reconfigure(RssConf conf) {
+ int nodeMax = conf.get(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
+ if (nodeMax != defaultRequiredShuffleServerNumber) {
+ LOG.warn("Coordinator update new defaultRequiredShuffleServerNumber " + nodeMax);
+ defaultRequiredShuffleServerNumber = nodeMax;
+ }
+ }
+
+ @Override
+ public boolean isPropertyReconfigurable(String property) {
+ return CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX.key().equals(property);
+ }
}
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index ddc7f4dd..d53a5b37 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -106,6 +106,7 @@ This document will introduce how to deploy Uniffle coordinators.
|rss.coordinator.quota.default.path|-|A configuration file for the number of apps for a user-defined user.|
|rss.coordinator.quota.default.app.num|5|Default number of apps at user level.|
|rss.metrics.reporter.class|-|The class of metrics reporter.|
+|rss.reconfigure.interval.sec|5|Reconfigure check interval.|
### AccessClusterLoadChecker settings
|Property Name|Default| Description|
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
index e73d8738..cf93d224 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
@@ -18,11 +18,14 @@
package org.apache.uniffle.test;
import java.io.File;
+import java.io.FileWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -32,6 +35,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
+import org.apache.uniffle.common.config.ReconfigurableBase;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.SimpleClusterManager;
@@ -54,6 +58,9 @@ public class CoordinatorAssignmentTest extends CoordinatorTestBase {
CoordinatorConf coordinatorConf1 = getCoordinatorConf();
coordinatorConf1.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 2000);
coordinatorConf1.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, SHUFFLE_NODES_MAX);
+ coordinatorConf1.setString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME,
+ new File(tmpDir, "coordinator.conf").getPath());
+ coordinatorConf1.setLong(RssBaseConf.RSS_RECONFIGURE_INTERVAL_SEC, 1L);
createCoordinatorServer(coordinatorConf1);
CoordinatorConf coordinatorConf2 = getCoordinatorConf();
@@ -61,6 +68,9 @@ public class CoordinatorAssignmentTest extends CoordinatorTestBase {
coordinatorConf2.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, SHUFFLE_NODES_MAX);
coordinatorConf2.setInteger(CoordinatorConf.RPC_SERVER_PORT, COORDINATOR_PORT_2);
coordinatorConf2.setInteger(CoordinatorConf.JETTY_HTTP_PORT, JETTY_PORT_2);
+ coordinatorConf2.setString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME,
+ new File(tmpDir, "coordinator.conf").getPath());
+ coordinatorConf2.setLong(RssBaseConf.RSS_RECONFIGURE_INTERVAL_SEC, 1L);
createCoordinatorServer(coordinatorConf2);
for (int i = 0; i < SERVER_NUM; i++) {
@@ -142,4 +152,28 @@ public class CoordinatorAssignmentTest extends CoordinatorTestBase {
info = shuffleWriteClient.getShuffleAssignments("app1", 0, 10, 1, TAGS, SERVER_NUM - 1, -1);
assertEquals(SHUFFLE_NODES_MAX - 1, info.getServerToPartitionRanges().keySet().size());
}
+
+ @Test
+ public void testReconfigureNodeMax() throws Exception {
+ String fileName = coordinators.get(0).getCoordinatorConf()
+ .getString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME,"");
+ new File(fileName).createNewFile();
+ ShuffleWriteClientImpl shuffleWriteClient = new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
+ 1, 1, 1, true, 1, 1, 10, 10);
+ shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
+ ShuffleAssignmentsInfo info = shuffleWriteClient.getShuffleAssignments("app1", 0, 10, 1, TAGS, SERVER_NUM + 10, -1);
+ assertEquals(SHUFFLE_NODES_MAX, info.getServerToPartitionRanges().keySet().size());
+ Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
+ try (FileWriter fileWriter = new FileWriter(fileName)) {
+ fileWriter.append(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX.key() + " " + 5);
+ }
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+ info = shuffleWriteClient.getShuffleAssignments("app1", 0, 10, 1, TAGS, SERVER_NUM + 10, -1);
+ assertEquals(5, info.getServerToPartitionRanges().keySet().size());
+ try (FileWriter fileWriter = new FileWriter(fileName)) {
+ fileWriter.append(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX.key() + " " + 10);
+ }
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+ }
+
}