You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/08 06:04:59 UTC

[incubator-uniffle] branch master updated: [ISSUE-127][IMPROVEMENT] Add timeout reconnection when DelegationRssShuffleManager send the request of AccessCluster (#139)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c6705c  [ISSUE-127][IMPROVEMENT] Add timeout reconnection when DelegationRssShuffleManager send the request of AccessCluster (#139)
2c6705c is described below

commit 2c6705c05ef6e76933f23ba4b1cfded12d18d271
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Mon Aug 8 14:04:55 2022 +0800

    [ISSUE-127][IMPROVEMENT] Add timeout reconnection when DelegationRssShuffleManager send the request of AccessCluster (#139)
    
    ### What changes were proposed in this pull request?
    To solve issue [#127](https://github.com/apache/incubator-uniffle/issues/127)
    
    ### Why are the changes needed?
    Avoid some memory shortage situations, and retry to ensure that the tasks run in the RSS cluster as much as possible.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, two new parameters are added on the client side, `spark.rss.client.access.retry.times`  the number of retry reconnection and `spark.rss.client.access.retry.interval.ms`  the reconnection interval. The user can set these two parameters within his expected time to make the task run in the RSS cluster as much as possible.
    
    ### How was this patch tested?
    UT
---
 .../org/apache/spark/shuffle/RssSparkConfig.java   | 12 +++++-
 .../spark/shuffle/DelegationRssShuffleManager.java | 37 ++++++++++--------
 .../shuffle/DelegationRssShuffleManagerTest.java   | 45 ++++++++++++++++++++++
 .../spark/shuffle/DelegationRssShuffleManager.java | 37 ++++++++++--------
 .../shuffle/DelegationRssShuffleManagerTest.java   | 41 ++++++++++++++++++++
 docs/client_guide.md                               |  2 +
 6 files changed, 141 insertions(+), 33 deletions(-)

diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index c546bdc..67e2971 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -214,7 +214,17 @@ public class RssSparkConfig {
   public static final ConfigEntry<Integer> RSS_CLIENT_ASSIGNMENT_RETRY_TIMES = createIntegerBuilder(
           new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES))
           .createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE);
-  
+
+  public static final ConfigEntry<Long> RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS = createLongBuilder(
+      new ConfigBuilder("spark.rss.client.access.retry.interval.ms")
+          .doc("Interval between retries fallback to SortShuffleManager"))
+      .createWithDefault(20000L);
+
+  public static final ConfigEntry<Integer> RSS_CLIENT_ACCESS_RETRY_TIMES = createIntegerBuilder(
+      new ConfigBuilder("spark.rss.client.access.retry.times")
+          .doc("Number of retries fallback to SortShuffleManager"))
+      .createWithDefault(0);
+
   public static final ConfigEntry<String> RSS_COORDINATOR_QUORUM = createStringBuilder(
       new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_COORDINATOR_QUORUM)
           .doc("Coordinator quorum"))
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 7ff9467..1d69d9a 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -34,6 +34,7 @@ import org.apache.uniffle.client.response.ResponseStatusCode;
 import org.apache.uniffle.client.response.RssAccessClusterResponse;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class DelegationRssShuffleManager implements ShuffleManager {
 
@@ -95,25 +96,29 @@ public class DelegationRssShuffleManager implements ShuffleManager {
       LOG.warn("Access id key is empty");
       return false;
     }
+    long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
+    int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);
 
     for (CoordinatorClient coordinatorClient : coordinatorClients) {
+      Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
+      boolean canAccess;
       try {
-        Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
-
-        RssAccessClusterResponse response =
-            coordinatorClient.accessCluster(new RssAccessClusterRequest(
-                accessId, assignmentTags, accessTimeoutMs));
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          LOG.warn("Success to access cluster {} using {}", coordinatorClient.getDesc(), accessId);
-          return true;
-        } else if (response.getStatusCode() == ResponseStatusCode.ACCESS_DENIED) {
-          LOG.warn("Request to access cluster {} is denied using {} for {}",
-              coordinatorClient.getDesc(), accessId, response.getMessage());
-          return false;
-        } else {
-          LOG.warn("Fail to reach cluster {} for {}", coordinatorClient.getDesc(), response.getMessage());
-        }
-      } catch (Exception e) {
+        canAccess = RetryUtils.retry(() -> {
+          RssAccessClusterResponse response = coordinatorClient.accessCluster(new RssAccessClusterRequest(
+              accessId, assignmentTags, accessTimeoutMs));
+          if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+            LOG.warn("Success to access cluster {} using {}", coordinatorClient.getDesc(), accessId);
+            return true;
+          } else if (response.getStatusCode() == ResponseStatusCode.ACCESS_DENIED) {
+            throw new RssException("Request to access cluster " + coordinatorClient.getDesc() + " is denied using "
+                + accessId + " for " + response.getMessage());
+          } else {
+            throw new RssException("Fail to reach cluster " + coordinatorClient.getDesc()
+                + " for " + response.getMessage());
+          }
+        }, retryInterval, retryTimes);
+        return canAccess;
+      } catch (Throwable e) {
         LOG.warn("Fail to access cluster {} using {} for {}",
             coordinatorClient.getDesc(), accessId, e.getMessage());
       }
diff --git a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index 6f3d35b..dca71bb 100644
--- a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++ b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -19,8 +19,12 @@ package org.apache.spark.shuffle;
 
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.uniffle.client.request.RssAccessClusterRequest;
+import org.apache.uniffle.client.response.ResponseStatusCode;
 import org.apache.uniffle.storage.util.StorageType;
 import org.apache.spark.SparkConf;
 import org.apache.spark.shuffle.sort.SortShuffleManager;
@@ -32,6 +36,8 @@ import org.mockito.Mockito;
 
 import org.apache.uniffle.client.api.CoordinatorClient;
 import org.apache.uniffle.client.response.RssAccessClusterResponse;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.RetryUtils;
 
 import static org.apache.uniffle.client.response.ResponseStatusCode.ACCESS_DENIED;
 import static org.apache.uniffle.client.response.ResponseStatusCode.SUCCESS;
@@ -134,6 +140,45 @@ public class DelegationRssShuffleManagerTest {
     assertTrue(hasException);
   }
 
+  @Test
+  public void testTryAccessCluster() throws Exception {
+    CoordinatorClient mockDeniedCoordinatorClient = mock(CoordinatorClient.class);
+    when(mockDeniedCoordinatorClient.accessCluster(any()))
+        .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+        .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+        .thenReturn(new RssAccessClusterResponse(SUCCESS, ""));
+    List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
+    coordinatorClients.add(mockDeniedCoordinatorClient);
+    mockedStaticRssShuffleUtils.when(() ->
+        RssSparkShuffleUtils.createCoordinatorClients(any())).thenReturn(coordinatorClients);
+    SparkConf conf = new SparkConf();
+    conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+    conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+    conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+    conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+    conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+    conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+    assertCreateRssShuffleManager(conf);
+
+    CoordinatorClient mockCoordinatorClient = mock(CoordinatorClient.class);
+    when(mockCoordinatorClient.accessCluster(any()))
+        .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+        .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+        .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""));
+    List<CoordinatorClient> SecondCoordinatorClients = Lists.newArrayList();
+    SecondCoordinatorClients.add(mockCoordinatorClient);
+    mockedStaticRssShuffleUtils.when(() ->
+        RssSparkShuffleUtils.createCoordinatorClients(any())).thenReturn(SecondCoordinatorClients);
+    SparkConf SecondConf = new SparkConf();
+    SecondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+    SecondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+    SecondConf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+    SecondConf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+    SecondConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+    SecondConf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+    assertCreateSortShuffleManager(SecondConf);
+  }
+
   private DelegationRssShuffleManager assertCreateSortShuffleManager(SparkConf conf) throws Exception {
     DelegationRssShuffleManager delegationRssShuffleManager = new DelegationRssShuffleManager(conf, true);
     assertTrue(delegationRssShuffleManager.getDelegate() instanceof SortShuffleManager);
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 33d8833..7c76e20 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -34,6 +34,7 @@ import org.apache.uniffle.client.response.ResponseStatusCode;
 import org.apache.uniffle.client.response.RssAccessClusterResponse;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
 
 public class DelegationRssShuffleManager implements ShuffleManager {
 
@@ -95,25 +96,29 @@ public class DelegationRssShuffleManager implements ShuffleManager {
       LOG.warn("Access id key is empty");
       return false;
     }
+    long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
+    int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);
 
     for (CoordinatorClient coordinatorClient : coordinatorClients) {
+      Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
+      boolean canAccess;
       try {
-        Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
-
-        RssAccessClusterResponse response =
-            coordinatorClient.accessCluster(new RssAccessClusterRequest(
-                accessId, assignmentTags, accessTimeoutMs));
-        if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
-          LOG.warn("Success to access cluster {} using {}", coordinatorClient.getDesc(), accessId);
-          return true;
-        } else if (response.getStatusCode() == ResponseStatusCode.ACCESS_DENIED) {
-          LOG.warn("Request to access cluster {} is denied using {} for {}",
-              coordinatorClient.getDesc(), accessId, response.getMessage());
-          return false;
-        } else {
-          LOG.warn("Fail to reach cluster {} for {}", coordinatorClient.getDesc(), response.getMessage());
-        }
-      } catch (Exception e) {
+        canAccess = RetryUtils.retry(() -> {
+          RssAccessClusterResponse response = coordinatorClient.accessCluster(new RssAccessClusterRequest(
+              accessId, assignmentTags, accessTimeoutMs));
+          if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+            LOG.warn("Success to access cluster {} using {}", coordinatorClient.getDesc(), accessId);
+            return true;
+          } else if (response.getStatusCode() == ResponseStatusCode.ACCESS_DENIED) {
+            throw new RssException("Request to access cluster " + coordinatorClient.getDesc() + " is denied using "
+                + accessId + " for " + response.getMessage());
+          } else {
+            throw new RssException("Fail to reach cluster " + coordinatorClient.getDesc()
+                + " for " + response.getMessage());
+          }
+        }, retryInterval, retryTimes);
+        return canAccess;
+      } catch (Throwable e) {
         LOG.warn("Fail to access cluster {} using {} for {}",
             coordinatorClient.getDesc(), accessId, e.getMessage());
       }
diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index 6382e0a..9682b4b 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -33,6 +33,8 @@ import org.mockito.Mockito;
 
 import org.apache.uniffle.client.api.CoordinatorClient;
 import org.apache.uniffle.client.response.RssAccessClusterResponse;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.RetryUtils;
 
 import static org.apache.uniffle.client.response.ResponseStatusCode.ACCESS_DENIED;
 import static org.apache.uniffle.client.response.ResponseStatusCode.SUCCESS;
@@ -135,6 +137,45 @@ public class DelegationRssShuffleManagerTest {
     assertTrue(hasException);
   }
 
+  @Test
+  public void testTryAccessCluster() throws Exception {
+    CoordinatorClient mockDeniedCoordinatorClient = mock(CoordinatorClient.class);
+    when(mockDeniedCoordinatorClient.accessCluster(any()))
+        .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+        .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+        .thenReturn(new RssAccessClusterResponse(SUCCESS, ""));
+    List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
+    coordinatorClients.add(mockDeniedCoordinatorClient);
+    mockedStaticRssShuffleUtils.when(() ->
+        RssSparkShuffleUtils.createCoordinatorClients(any())).thenReturn(coordinatorClients);
+    SparkConf conf = new SparkConf();
+    conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+    conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+    conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+    conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+    conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+    conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+    assertCreateRssShuffleManager(conf);
+
+    CoordinatorClient mockCoordinatorClient = mock(CoordinatorClient.class);
+    when(mockCoordinatorClient.accessCluster(any()))
+        .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+        .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+        .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""));
+    List<CoordinatorClient> SecondCoordinatorClients = Lists.newArrayList();
+    SecondCoordinatorClients.add(mockCoordinatorClient);
+    mockedStaticRssShuffleUtils.when(() ->
+        RssSparkShuffleUtils.createCoordinatorClients(any())).thenReturn(SecondCoordinatorClients);
+    SparkConf SecondConf = new SparkConf();
+    SecondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+    SecondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+    SecondConf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+    SecondConf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+    SecondConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+    SecondConf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+    assertCreateSortShuffleManager(SecondConf);
+  }
+
   private DelegationRssShuffleManager assertCreateSortShuffleManager(SparkConf conf) throws Exception {
     DelegationRssShuffleManager delegationRssShuffleManager = new DelegationRssShuffleManager(conf, true);
     assertTrue(delegationRssShuffleManager.getDelegate() instanceof SortShuffleManager);
diff --git a/docs/client_guide.md b/docs/client_guide.md
index b97474e..b4c1219 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -113,6 +113,8 @@ Other configuration:
 |Property Name|Default|Description|
 |---|---|---|
 |spark.rss.access.timeout.ms|10000|The timeout to access Uniffle coordinator|
+|spark.rss.client.access.retry.interval.ms|20000|The interval between retries fallback to SortShuffleManager|
+|spark.rss.client.access.retry.times|0|The number of retries fallback to SortShuffleManager|
   
 
 ### Client Quorum Setting