You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/09/20 02:32:09 UTC

[incubator-uniffle] branch master updated: Use the conf of shuffleNodesNumber from jobs to be as checking factor (#208)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dc7f0db Use the conf of shuffleNodesNumber from jobs to be as checking factor (#208)
9dc7f0db is described below

commit 9dc7f0dbe0d374ba92c1763c4c88e1854e5c6d72
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Tue Sep 20 10:32:05 2022 +0800

    Use the conf of shuffleNodesNumber from jobs to be as checking factor (#208)
    
    ### What changes were proposed in this pull request?
    Use the conf of shuffleNodesNumber from jobs to be as checking factor
    
    ### Why are the changes needed?
    In the PR #97 , it allow client to specify the shuffle server numbers, but in clusterLoaderChecker, it dont take this into considering. In this PR, it will use the conf of shuffleNodesNumber from jobs to be as checking factor only when the conf of `rss.coordinator.access.loadChecker.serverNum.threshold` is missed.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    
    ### How was this patch tested?
    UTs.
---
 .../spark/shuffle/DelegationRssShuffleManager.java | 10 ++-
 .../spark/shuffle/DelegationRssShuffleManager.java | 10 ++-
 .../org/apache/uniffle/common/util/Constants.java  |  2 +
 .../coordinator/AccessClusterLoadChecker.java      | 41 +++++++---
 .../uniffle/coordinator/CoordinatorConf.java       |  6 +-
 .../coordinator/AccessClusterLoadCheckerTest.java  | 89 +++++++++++++++++++++-
 docs/coordinator_guide.md                          |  2 +-
 7 files changed, 144 insertions(+), 16 deletions(-)

diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 1d69d9a6..6182ecb3 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -18,9 +18,11 @@
 package org.apache.spark.shuffle;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
@@ -36,6 +38,8 @@ import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.RetryUtils;
 
+import static org.apache.uniffle.common.util.Constants.ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM;
+
 public class DelegationRssShuffleManager implements ShuffleManager {
 
   private static final Logger LOG = LoggerFactory.getLogger(DelegationRssShuffleManager.class);
@@ -99,13 +103,17 @@ public class DelegationRssShuffleManager implements ShuffleManager {
     long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
     int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);
 
+    int assignmentShuffleNodesNum = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
+    Map<String, String> extraProperties = Maps.newHashMap();
+    extraProperties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum));
+
     for (CoordinatorClient coordinatorClient : coordinatorClients) {
       Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
       boolean canAccess;
       try {
         canAccess = RetryUtils.retry(() -> {
           RssAccessClusterResponse response = coordinatorClient.accessCluster(new RssAccessClusterRequest(
-              accessId, assignmentTags, accessTimeoutMs));
+              accessId, assignmentTags, accessTimeoutMs, extraProperties));
           if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
             LOG.warn("Success to access cluster {} using {}", coordinatorClient.getDesc(), accessId);
             return true;
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 7c76e200..5078c708 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -18,9 +18,11 @@
 package org.apache.spark.shuffle;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.spark.ShuffleDependency;
 import org.apache.spark.SparkConf;
@@ -36,6 +38,8 @@ import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.RetryUtils;
 
+import static org.apache.uniffle.common.util.Constants.ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM;
+
 public class DelegationRssShuffleManager implements ShuffleManager {
 
   private static final Logger LOG = LoggerFactory.getLogger(DelegationRssShuffleManager.class);
@@ -99,13 +103,17 @@ public class DelegationRssShuffleManager implements ShuffleManager {
     long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
     int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);
 
+    int assignmentShuffleNodesNum = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_SHUFFLE_SERVER_NUMBER);
+    Map<String, String> extraProperties = Maps.newHashMap();
+    extraProperties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, String.valueOf(assignmentShuffleNodesNum));
+
     for (CoordinatorClient coordinatorClient : coordinatorClients) {
       Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
       boolean canAccess;
       try {
         canAccess = RetryUtils.retry(() -> {
           RssAccessClusterResponse response = coordinatorClient.accessCluster(new RssAccessClusterRequest(
-              accessId, assignmentTags, accessTimeoutMs));
+              accessId, assignmentTags, accessTimeoutMs, extraProperties));
           if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
             LOG.warn("Success to access cluster {} using {}", coordinatorClient.getDesc(), accessId);
             return true;
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index a53edd01..b480ce58 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -43,4 +43,6 @@ public class Constants {
   public static final String CONF_REMOTE_STORAGE_PATH = ".remote.storage.path";
   public static final String RSS_CLIENT_CONF_REMOTE_STORAGE_PATH =
           RSS_CLIENT_CONF_COMMON_PREFIX + CONF_REMOTE_STORAGE_PATH;
+
+  public static final String ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM = "access_info_required_shuffle_nodes_num";
 }
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessClusterLoadChecker.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessClusterLoadChecker.java
index e388d8b5..f460a985 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessClusterLoadChecker.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/AccessClusterLoadChecker.java
@@ -20,11 +20,14 @@ package org.apache.uniffle.coordinator;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.util.Constants;
 
+import static org.apache.uniffle.common.util.Constants.ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM;
+
 /**
  * AccessClusterLoadChecker use the cluster load metrics including memory and healthy to
  * filter and count available nodes numbers and reject if the number do not reach the threshold.
@@ -35,7 +38,9 @@ public class AccessClusterLoadChecker extends AbstractAccessChecker {
 
   private final ClusterManager clusterManager;
   private final double memoryPercentThreshold;
+  // The hard constraint number of available shuffle servers
   private final int availableServerNumThreshold;
+  private final int defaultRequiredShuffleServerNumber;
 
   public AccessClusterLoadChecker(AccessManager accessManager) throws Exception {
     super(accessManager);
@@ -44,24 +49,40 @@ public class AccessClusterLoadChecker extends AbstractAccessChecker {
     this.memoryPercentThreshold = conf.getDouble(CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_MEMORY_PERCENTAGE);
     this.availableServerNumThreshold = conf.getInteger(
         CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_SERVER_NUM_THRESHOLD,
-        conf.get(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX));
+        -1
+    );
+    this.defaultRequiredShuffleServerNumber = conf.get(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
   }
 
   public AccessCheckResult check(AccessInfo accessInfo) {
     Set<String> tags = accessInfo.getTags();
     List<ServerNode> servers = clusterManager.getServerList(tags);
     int size = (int) servers.stream().filter(ServerNode::isHealthy).filter(this::checkMemory).count();
-    if (size >= availableServerNumThreshold) {
+
+    // If the hard constraint number exist, directly check it
+    if (availableServerNumThreshold != -1 && size >= availableServerNumThreshold) {
       return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
-    } else {
-      String msg = String.format("Denied by AccessClusterLoadChecker accessInfo[%s], "
-              + "total %s nodes, %s available nodes, "
-              + "memory percent threshold %s, available num threshold %s.",
-          accessInfo, servers.size(), size, memoryPercentThreshold, availableServerNumThreshold);
-      LOG.warn(msg);
-      CoordinatorMetrics.counterTotalLoadDeniedRequest.inc();
-      return new AccessCheckResult(false, msg);
     }
+
+    // If the hard constraint is missing, check the available servers number meet the job's required server size
+    if (availableServerNumThreshold == -1) {
+      String requiredNodesNumRaw = accessInfo.getExtraProperties().get(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM);
+      int requiredNodesNum = defaultRequiredShuffleServerNumber;
+      if (StringUtils.isNotEmpty(requiredNodesNumRaw) && Integer.parseInt(requiredNodesNumRaw) > 0) {
+        requiredNodesNum = Integer.parseInt(requiredNodesNumRaw);
+      }
+      if (size >= requiredNodesNum) {
+        return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
+      }
+    }
+
+    String msg = String.format("Denied by AccessClusterLoadChecker accessInfo[%s], "
+            + "total %s nodes, %s available nodes, "
+            + "memory percent threshold %s, available num threshold %s.",
+        accessInfo, servers.size(), size, memoryPercentThreshold, availableServerNumThreshold);
+    LOG.warn(msg);
+    CoordinatorMetrics.counterTotalLoadDeniedRequest.inc();
+    return new AccessCheckResult(false, msg);
   }
 
   private boolean checkMemory(ServerNode serverNode) {
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index 28931a42..b4b4126a 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -102,7 +102,11 @@ public class CoordinatorConf extends RssBaseConf {
       .intType()
       .checkValue(ConfigUtils.POSITIVE_INTEGER_VALIDATOR_2, "load checker serverNum threshold must be positive")
       .noDefaultValue()
-      .withDescription("The minimal required number of healthy shuffle servers when being accessed by client");
+      .withDescription(
+          "The minimal required number of healthy shuffle servers when being accessed by client. "
+          + "And when not specified, it will use the required shuffle-server number from client as the checking "
+          + "condition. If there is no client shuffle-server number specified, the coordinator conf "
+          + "of rss.coordinator.shuffle.nodes.max will be adopted");
   public static final ConfigOption<Boolean> COORDINATOR_DYNAMIC_CLIENT_CONF_ENABLED = ConfigOptions
       .key("rss.coordinator.dynamicClientConf.enabled")
       .booleanType()
diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
index eb140e2a..18542162 100644
--- a/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
+++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/AccessClusterLoadCheckerTest.java
@@ -17,7 +17,11 @@
 
 package org.apache.uniffle.coordinator;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 import com.google.common.collect.Lists;
@@ -26,6 +30,10 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import static org.apache.uniffle.common.util.Constants.ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM;
+import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ACCESS_CHECKERS;
+import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ACCESS_LOADCHECKER_MEMORY_PERCENTAGE;
+import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -46,7 +54,84 @@ public class AccessClusterLoadCheckerTest {
   }
 
   @Test
-  public void test() throws Exception {
+  public void testAccessInfoRequiredShuffleServers() throws Exception {
+    List<ServerNode> nodes = Lists.newArrayList();
+    ServerNode node1 = new ServerNode(
+        "1",
+        "1",
+        0,
+        50,
+        20,
+        1000,
+        0,
+        null,
+        true);
+    ServerNode node2 = new ServerNode(
+        "1",
+        "1",
+        0,
+        50,
+        20,
+        1000,
+        0,
+        null,
+        true);
+    nodes.add(node1);
+    nodes.add(node2);
+
+    ClusterManager clusterManager = mock(SimpleClusterManager.class);
+    when(clusterManager.getServerList(any())).thenReturn(nodes);
+
+    CoordinatorConf conf = new CoordinatorConf();
+    conf.set(COORDINATOR_ACCESS_CHECKERS, Arrays.asList("org.apache.uniffle.coordinator.AccessClusterLoadChecker"));
+    conf.set(COORDINATOR_SHUFFLE_NODES_MAX, 3);
+    conf.set(COORDINATOR_ACCESS_LOADCHECKER_MEMORY_PERCENTAGE, 20.0);
+
+    AccessManager accessManager = new AccessManager(conf, clusterManager, new Configuration());
+
+    AccessClusterLoadChecker accessClusterLoadChecker =
+        (AccessClusterLoadChecker) accessManager.getAccessCheckers().get(0);
+
+    /**
+     * case1:
+     * when setting the invalid required shuffle nodes number of job and available servers less than
+     * the COORDINATOR_SHUFFLE_NODES_MAX, it should return false
+     */
+    Map<String, String> properties = new HashMap<>();
+    properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "-1");
+    AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties);
+    assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+
+    /**
+     * case2:
+     * when setting the valid required shuffle nodes number of job and available servers greater than
+     * the COORDINATOR_SHUFFLE_NODES_MAX, it should return true
+     */
+    properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "1");
+    accessInfo = new AccessInfo("test", new HashSet<>(), properties);
+    assertTrue(accessClusterLoadChecker.check(accessInfo).isSuccess());
+
+    /**
+     * case3:
+     * when setting the valid required shuffle nodes number of job and available servers less than
+     * the COORDINATOR_SHUFFLE_NODES_MAX, it should return false
+     */
+    properties.put(ACCESS_INFO_REQUIRED_SHUFFLE_NODES_NUM, "100");
+    accessInfo = new AccessInfo("test", new HashSet<>(), properties);
+    assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+
+    /**
+     * case4:
+     * when the required shuffle nodes number is not specified in access info, it should use the
+     * default shuffle nodes max from coordinator conf.
+     */
+    properties = new HashMap<>();
+    accessInfo = new AccessInfo("test", new HashSet<>(), properties);
+    assertFalse(accessClusterLoadChecker.check(accessInfo).isSuccess());
+  }
+
+  @Test
+  public void testWhenAvailableServerThresholdSpecified() throws Exception {
     ClusterManager clusterManager = mock(SimpleClusterManager.class);
     List<ServerNode> serverNodeList = Lists.newArrayList();
     ServerNode node1 = new ServerNode(
@@ -63,7 +148,7 @@ public class AccessClusterLoadCheckerTest {
     final String filePath = Objects.requireNonNull(
         getClass().getClassLoader().getResource("coordinator.conf")).getFile();
     CoordinatorConf conf = new CoordinatorConf(filePath);
-    conf.setString(CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
+    conf.setString(COORDINATOR_ACCESS_CHECKERS.key(),
         "org.apache.uniffle.coordinator.AccessClusterLoadChecker");
     AccessManager accessManager = new AccessManager(conf, clusterManager, new Configuration());
     AccessClusterLoadChecker accessClusterLoadChecker =
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index f9c263a2..7a98f708 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -103,7 +103,7 @@ This document will introduce how to deploy Uniffle coordinators.
 ### AccessClusterLoadChecker settings
 |Property Name|Default|	Description|
 |---|---|---|
-|rss.coordinator.access.loadChecker.serverNum.threshold|-|The minimal required number of healthy shuffle servers when being accessed by client|
+|rss.coordinator.access.loadChecker.serverNum.threshold|-|The minimal required number of healthy shuffle servers when being accessed by client. And when not specified, it will use the required shuffle-server number from client as the checking condition. If there is no client shuffle-server number specified, the coordinator conf of rss.coordinator.shuffle.nodes.max will be adopted|
 
 ### AccessCandidatesChecker settings
 AccessCandidatesChecker is one of the built-in access checker, which will allow user to define the candidates list to use rss.