You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/02/10 06:30:44 UTC

[incubator-uniffle] branch master updated: [#410] feat: support the hot reload of coordinator's configuration (#572)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new ab17e778 [#410] feat: support the hot reload of coordinator's configuration (#572)
ab17e778 is described below

commit ab17e778a3231cbe55b6d11ed90167d82bb20ad6
Author: roryqi <ro...@apache.org>
AuthorDate: Fri Feb 10 14:30:39 2023 +0800

    [#410] feat: support the hot reload of coordinator's configuration (#572)
    
    ### What changes were proposed in this pull request?
    I refer to Hadoop implement. I design a hot reload process. We support nodeMax at first.
    
    ### Why are the changes needed?
    It's more convenient.
    
    ### Does this PR introduce _any_ user-facing change?
    add `rss.reconfigure.interval.sec`.
    
    ### How was this patch tested?
    UT
    
    Co-authored-by: roryqi <ro...@tencent.com>
    Co-authored-by: Kaijie Chen <ck...@apache.org>
---
 .../uniffle/common/config/Reconfigurable.java      | 46 ++++--------
 .../uniffle/common/config/ReconfigurableBase.java  | 82 ++++++++++++++++++++++
 .../apache/uniffle/common/config/RssBaseConf.java  |  7 ++
 .../apache/uniffle/coordinator/AccessManager.java  | 22 +++++-
 .../apache/uniffle/coordinator/ClusterManager.java |  4 +-
 .../uniffle/coordinator/CoordinatorServer.java     | 31 +++++++-
 .../uniffle/coordinator/SimpleClusterManager.java  | 17 ++++-
 .../access/checker/AccessClusterLoadChecker.java   | 20 +++++-
 docs/coordinator_guide.md                          |  1 +
 .../uniffle/test/CoordinatorAssignmentTest.java    | 34 +++++++++
 10 files changed, 225 insertions(+), 39 deletions(-)

diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java b/common/src/main/java/org/apache/uniffle/common/config/Reconfigurable.java
similarity index 51%
copy from coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
copy to common/src/main/java/org/apache/uniffle/common/config/Reconfigurable.java
index 8db4abc5..15f7cf35 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/Reconfigurable.java
@@ -15,43 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.uniffle.coordinator;
+package org.apache.uniffle.common.config;
 
-import java.io.Closeable;
-import java.util.List;
-import java.util.Set;
-
-public interface ClusterManager extends Closeable {
-
-  /**
-   * Add a server to the cluster.
-   *
-   * @param shuffleServerInfo server info
-   */
-  void add(ServerNode shuffleServerInfo);
+public interface Reconfigurable {
 
   /**
-   * Get available nodes from the cluster
-   *
-   * @param requiredTags tags for filter
-   * @return list of available server nodes
-   */
-  List<ServerNode> getServerList(Set<String> requiredTags);
+   * The method use new configuration to reconfigure the component
+   * @param conf means that new configuration after modification
+   **/
+  void reconfigure(RssConf conf);
 
+  // todo: Mark the properties is reloadable or not in the ConfigOptionBuilder
   /**
-   * @return number of server nodes in the cluster
-   */
-  int getNodesNum();
+   * This method judge whether the property could be reconfigurable or not.
+   * @param property means that property name.
+   * @return True means that the property could be reconfigurable.
+   *         False means that the property couldn't be reconfigurable.
+   **/
+  boolean isPropertyReconfigurable(String property);
 
-  /**
-   * @return list all server nodes in the cluster
-   */
-  List<ServerNode> list();
-
-  int getShuffleNodesMax();
-
-  /**
-   * @return whether to be ready for serving
-   */
-  boolean isReadyForServe();
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/config/ReconfigurableBase.java b/common/src/main/java/org/apache/uniffle/common/config/ReconfigurableBase.java
new file mode 100644
index 00000000..7f0fa978
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/config/ReconfigurableBase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.config;
+
+import java.io.File;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.ThreadUtils;
+
+public abstract class ReconfigurableBase implements Reconfigurable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ReconfigurableBase.class);
+  public static final String RECONFIGURABLE_FILE_NAME = "reconfigurable.file.name";
+
+  private final RssConf rssConf;
+
+  private final ScheduledExecutorService scheduledExecutorService;
+  private final long checkIntervalSec;
+  private final AtomicLong lastModify = new AtomicLong(0L);
+
+  public ReconfigurableBase(RssConf rssConf) {
+    this.rssConf = rssConf;
+    scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+        ThreadUtils.getThreadFactory("ReconfigurableThread-%d"));
+    checkIntervalSec = rssConf.getLong(RssBaseConf.RSS_RECONFIGURE_INTERVAL_SEC);
+  }
+
+  public void startReconfigureThread() {
+    scheduledExecutorService.scheduleAtFixedRate(
+        this::checkConfiguration, checkIntervalSec, checkIntervalSec, TimeUnit.SECONDS);
+  }
+
+  public void stopReconfigureThread() {
+    scheduledExecutorService.shutdown();
+  }
+
+  private void checkConfiguration() {
+    String fileName = rssConf.getString(RECONFIGURABLE_FILE_NAME, "");
+    if (fileName.isEmpty()) {
+      LOG.warn("Config file name isn't set, we skip checking");
+      return;
+    }
+    File configFile = new File(fileName);
+    if (!configFile.exists()) {
+      LOG.warn("Config file doesn't exist, we skip checking");
+      return;
+    }
+    long newLastModify = configFile.lastModified();
+    if (lastModify.get() == 0) {
+      lastModify.set(newLastModify);
+      return;
+    }
+    if (newLastModify != lastModify.get()) {
+      LOG.warn("Server detect the modification of file {}, we start to reconfigure", fileName);
+      lastModify.set(newLastModify);
+      reconfigure(reloadConfiguration());
+    }
+  }
+
+  protected abstract RssConf reloadConfiguration();
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index 1d794fb3..761e5bf3 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -205,6 +205,13 @@ public class RssBaseConf extends RssConf {
       .noDefaultValue()
       .withDescription("The class of metrics reporter.");
 
+  public static final ConfigOption<Long> RSS_RECONFIGURE_INTERVAL_SEC = ConfigOptions
+      .key("rss.reconfigure.interval.sec")
+      .longType()
+      .checkValue(ConfigUtils.POSITIVE_LONG_VALIDATOR, "The value must be posite long")
+      .defaultValue(5L)
+      .withDescription("Reconfigure check interval.");
+
   public boolean loadCommonConf(Map<String, String> properties) {
     if (properties == null) {
       return false;
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
index ea5a6b21..8914e87a 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessManager.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.config.Reconfigurable;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.coordinator.access.AccessCheckResult;
@@ -33,7 +35,7 @@ import org.apache.uniffle.coordinator.access.AccessInfo;
 import org.apache.uniffle.coordinator.access.checker.AccessChecker;
 import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
 
-public class AccessManager {
+public class AccessManager implements Reconfigurable {
 
   private static final Logger LOG = LoggerFactory.getLogger(AccessManager.class);
 
@@ -107,4 +109,22 @@ public class AccessManager {
       checker.close();
     }
   }
+
+  public boolean isPropertyReconfigurable(String property) {
+    for (AccessChecker checker : accessCheckers) {
+      if (checker instanceof Reconfigurable
+          && ((Reconfigurable) checker).isPropertyReconfigurable(property)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public void reconfigure(RssConf conf) {
+    for (AccessChecker checker : accessCheckers) {
+      if (checker instanceof Reconfigurable) {
+        ((Reconfigurable) checker).reconfigure(conf);
+      }
+    }
+  }
 }
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
index 8db4abc5..5139b10b 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
@@ -21,7 +21,9 @@ import java.io.Closeable;
 import java.util.List;
 import java.util.Set;
 
-public interface ClusterManager extends Closeable {
+import org.apache.uniffle.common.config.Reconfigurable;
+
+public interface ClusterManager extends Closeable, Reconfigurable {
 
   /**
    * Add a server to the cluster.
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
index e1a126be..671055e7 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java
@@ -24,6 +24,8 @@ import org.slf4j.LoggerFactory;
 import picocli.CommandLine;
 
 import org.apache.uniffle.common.Arguments;
+import org.apache.uniffle.common.config.ReconfigurableBase;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.metrics.GRPCMetrics;
 import org.apache.uniffle.common.metrics.JvmMetrics;
 import org.apache.uniffle.common.metrics.MetricReporter;
@@ -48,7 +50,7 @@ import static org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_K
 /**
  * The main entrance of coordinator service
  */
-public class CoordinatorServer {
+public class CoordinatorServer extends ReconfigurableBase {
 
   private static final Logger LOG = LoggerFactory.getLogger(CoordinatorServer.class);
 
@@ -65,6 +67,7 @@ public class CoordinatorServer {
   private String id;
 
   public CoordinatorServer(CoordinatorConf coordinatorConf) throws Exception {
+    super(coordinatorConf);
     this.coordinatorConf = coordinatorConf;
     try {
       initialization();
@@ -84,6 +87,8 @@ public class CoordinatorServer {
     // Load configuration from config files
     final CoordinatorConf coordinatorConf = new CoordinatorConf(configFile);
 
+    coordinatorConf.setString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME, configFile);
+
     // Start the coordinator service
     final CoordinatorServer coordinatorServer = new CoordinatorServer(coordinatorConf);
 
@@ -92,6 +97,7 @@ public class CoordinatorServer {
   }
 
   public void start() throws Exception {
+    startReconfigureThread();
     jettyServer.start();
     server.start();
 
@@ -126,6 +132,7 @@ public class CoordinatorServer {
       metricReporter.stop();
       LOG.info("Metric Reporter Stopped!");
     }
+    stopReconfigureThread();
     SecurityContextFactory.get().getSecurityContext().close();
     server.stop();
   }
@@ -241,4 +248,26 @@ public class CoordinatorServer {
   private void blockUntilShutdown() throws InterruptedException {
     server.blockUntilShutdown();
   }
+
+  @Override
+  public void reconfigure(RssConf conf) {
+    clusterManager.reconfigure(conf);
+    accessManager.reconfigure(conf);
+  }
+
+  @Override
+  public boolean isPropertyReconfigurable(String property) {
+    if (clusterManager.isPropertyReconfigurable(property)) {
+      return true;
+    }
+    if (accessManager.isPropertyReconfigurable(property)) {
+      return true;
+    }
+    return false;
+  }
+
+  @Override
+  public RssConf reloadConfiguration() {
+    return new CoordinatorConf(coordinatorConf.getString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME, ""));
+  }
 }
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index ae1f6d23..dbe5df33 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
@@ -58,7 +59,7 @@ public class SimpleClusterManager implements ClusterManager {
   private Map<String, Set<ServerNode>> tagToNodes = Maps.newConcurrentMap();
   private AtomicLong excludeLastModify = new AtomicLong(0L);
   private long heartbeatTimeout;
-  private int shuffleNodesMax;
+  private volatile int shuffleNodesMax;
   private ScheduledExecutorService scheduledExecutorService;
   private ScheduledExecutorService checkNodesExecutorService;
   private FileSystem hadoopFileSystem;
@@ -278,4 +279,18 @@ public class SimpleClusterManager implements ClusterManager {
   public void setStartupSilentPeriodEnabled(boolean startupSilentPeriodEnabled) {
     this.startupSilentPeriodEnabled = startupSilentPeriodEnabled;
   }
+
+  @Override
+  public void reconfigure(RssConf conf) {
+    int nodeMax = conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
+    if (nodeMax != shuffleNodesMax) {
+      LOG.warn("Coordinator update new shuffleNodesMax " + nodeMax);
+      shuffleNodesMax = nodeMax;
+    }
+  }
+
+  @Override
+  public boolean isPropertyReconfigurable(String property) {
+    return CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX.key().equals(property);
+  }
 }
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
index 773e68a8..67eb584d 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/access/checker/AccessClusterLoadChecker.java
@@ -24,6 +24,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.config.Reconfigurable;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.coordinator.AccessManager;
 import org.apache.uniffle.coordinator.ClusterManager;
@@ -39,7 +41,7 @@ import static org.apache.uniffle.common.util.Constants.ACCESS_INFO_REQUIRED_SHUF
  * AccessClusterLoadChecker use the cluster load metrics including memory and healthy to
  * filter and count available nodes numbers and reject if the number do not reach the threshold.
  */
-public class AccessClusterLoadChecker extends AbstractAccessChecker {
+public class AccessClusterLoadChecker extends AbstractAccessChecker implements Reconfigurable {
 
   private static final Logger LOG = LoggerFactory.getLogger(AccessClusterLoadChecker.class);
 
@@ -47,7 +49,7 @@ public class AccessClusterLoadChecker extends AbstractAccessChecker {
   private final double memoryPercentThreshold;
   // The hard constraint number of available shuffle servers
   private final int availableServerNumThreshold;
-  private final int defaultRequiredShuffleServerNumber;
+  private volatile int defaultRequiredShuffleServerNumber;
 
   public AccessClusterLoadChecker(AccessManager accessManager) throws Exception {
     super(accessManager);
@@ -114,4 +116,18 @@ public class AccessClusterLoadChecker extends AbstractAccessChecker {
 
   public void close() {
   }
+
+  @Override
+  public void reconfigure(RssConf conf) {
+    int nodeMax = conf.get(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
+    if (nodeMax != defaultRequiredShuffleServerNumber) {
+      LOG.warn("Coordinator update new defaultRequiredShuffleServerNumber " + nodeMax);
+      defaultRequiredShuffleServerNumber = nodeMax;
+    }
+  }
+
+  @Override
+  public boolean isPropertyReconfigurable(String property) {
+    return CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX.key().equals(property);
+  }
 }
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index ddc7f4dd..d53a5b37 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -106,6 +106,7 @@ This document will introduce how to deploy Uniffle coordinators.
 |rss.coordinator.quota.default.path|-|A configuration file for the number of apps for a user-defined user.|
 |rss.coordinator.quota.default.app.num|5|Default number of apps at user level.|
 |rss.metrics.reporter.class|-|The class of metrics reporter.|
+|rss.reconfigure.interval.sec|5|Reconfigure check interval.|
 
 ### AccessClusterLoadChecker settings
 |Property Name|Default|	Description|
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
index e73d8738..cf93d224 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
@@ -18,11 +18,14 @@
 package org.apache.uniffle.test;
 
 import java.io.File;
+import java.io.FileWriter;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -32,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
+import org.apache.uniffle.common.config.ReconfigurableBase;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.SimpleClusterManager;
@@ -54,6 +58,9 @@ public class CoordinatorAssignmentTest extends CoordinatorTestBase {
     CoordinatorConf coordinatorConf1 = getCoordinatorConf();
     coordinatorConf1.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 2000);
     coordinatorConf1.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, SHUFFLE_NODES_MAX);
+    coordinatorConf1.setString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME,
+        new File(tmpDir, "coordinator.conf").getPath());
+    coordinatorConf1.setLong(RssBaseConf.RSS_RECONFIGURE_INTERVAL_SEC, 1L);
     createCoordinatorServer(coordinatorConf1);
 
     CoordinatorConf coordinatorConf2 = getCoordinatorConf();
@@ -61,6 +68,9 @@ public class CoordinatorAssignmentTest extends CoordinatorTestBase {
     coordinatorConf2.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, SHUFFLE_NODES_MAX);
     coordinatorConf2.setInteger(CoordinatorConf.RPC_SERVER_PORT, COORDINATOR_PORT_2);
     coordinatorConf2.setInteger(CoordinatorConf.JETTY_HTTP_PORT, JETTY_PORT_2);
+    coordinatorConf2.setString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME,
+        new File(tmpDir, "coordinator.conf").getPath());
+    coordinatorConf2.setLong(RssBaseConf.RSS_RECONFIGURE_INTERVAL_SEC, 1L);
     createCoordinatorServer(coordinatorConf2);
 
     for (int i = 0; i < SERVER_NUM; i++) {
@@ -142,4 +152,28 @@ public class CoordinatorAssignmentTest extends CoordinatorTestBase {
     info = shuffleWriteClient.getShuffleAssignments("app1", 0, 10, 1, TAGS, SERVER_NUM - 1, -1);
     assertEquals(SHUFFLE_NODES_MAX - 1, info.getServerToPartitionRanges().keySet().size());
   }
+
+  @Test
+  public void testReconfigureNodeMax() throws Exception {
+    String fileName = coordinators.get(0).getCoordinatorConf()
+        .getString(ReconfigurableBase.RECONFIGURABLE_FILE_NAME,"");
+    new File(fileName).createNewFile();
+    ShuffleWriteClientImpl shuffleWriteClient = new ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
+        1, 1, 1, true, 1, 1, 10, 10);
+    shuffleWriteClient.registerCoordinators(COORDINATOR_QUORUM);
+    ShuffleAssignmentsInfo info = shuffleWriteClient.getShuffleAssignments("app1", 0, 10, 1, TAGS, SERVER_NUM + 10, -1);
+    assertEquals(SHUFFLE_NODES_MAX, info.getServerToPartitionRanges().keySet().size());
+    Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
+    try (FileWriter fileWriter = new FileWriter(fileName)) {
+      fileWriter.append(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX.key() + " " + 5);
+    }
+    Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+    info = shuffleWriteClient.getShuffleAssignments("app1", 0, 10, 1, TAGS, SERVER_NUM + 10, -1);
+    assertEquals(5, info.getServerToPartitionRanges().keySet().size());
+    try (FileWriter fileWriter = new FileWriter(fileName)) {
+      fileWriter.append(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX.key() + " " + 10);
+    }
+    Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+  }
+
 }