You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/08 06:04:59 UTC
[incubator-uniffle] branch master updated: [ISSUE-127][IMPROVEMENT] Add timeout reconnection when DelegationRssShuffleManager send the request of AccessCluster (#139)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 2c6705c [ISSUE-127][IMPROVEMENT] Add timeout reconnection when DelegationRssShuffleManager send the request of AccessCluster (#139)
2c6705c is described below
commit 2c6705c05ef6e76933f23ba4b1cfded12d18d271
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Mon Aug 8 14:04:55 2022 +0800
[ISSUE-127][IMPROVEMENT] Add timeout reconnection when DelegationRssShuffleManager send the request of AccessCluster (#139)
### What changes were proposed in this pull request?
To solve issue [#127](https://github.com/apache/incubator-uniffle/issues/127)
### Why are the changes needed?
Avoid some memory shortage situations, and retry to ensure that the tasks run in the RSS cluster as much as possible.
### Does this PR introduce _any_ user-facing change?
Yes, two new parameters are added on the client side, `spark.rss.client.access.retry.times` the number of retry reconnection and `spark.rss.client.access.retry.interval.ms` the reconnection interval. The user can set these two parameters within his expected time to make the task run in the RSS cluster as much as possible.
### How was this patch tested?
UT
---
.../org/apache/spark/shuffle/RssSparkConfig.java | 12 +++++-
.../spark/shuffle/DelegationRssShuffleManager.java | 37 ++++++++++--------
.../shuffle/DelegationRssShuffleManagerTest.java | 45 ++++++++++++++++++++++
.../spark/shuffle/DelegationRssShuffleManager.java | 37 ++++++++++--------
.../shuffle/DelegationRssShuffleManagerTest.java | 41 ++++++++++++++++++++
docs/client_guide.md | 2 +
6 files changed, 141 insertions(+), 33 deletions(-)
diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index c546bdc..67e2971 100644
--- a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -214,7 +214,17 @@ public class RssSparkConfig {
public static final ConfigEntry<Integer> RSS_CLIENT_ASSIGNMENT_RETRY_TIMES = createIntegerBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES))
.createWithDefault(RssClientConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES_DEFAULT_VALUE);
-
+
+ public static final ConfigEntry<Long> RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS = createLongBuilder(
+ new ConfigBuilder("spark.rss.client.access.retry.interval.ms")
+ .doc("Interval between retries fallback to SortShuffleManager"))
+ .createWithDefault(20000L);
+
+ public static final ConfigEntry<Integer> RSS_CLIENT_ACCESS_RETRY_TIMES = createIntegerBuilder(
+ new ConfigBuilder("spark.rss.client.access.retry.times")
+ .doc("Number of retries fallback to SortShuffleManager"))
+ .createWithDefault(0);
+
public static final ConfigEntry<String> RSS_COORDINATOR_QUORUM = createStringBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX + RssClientConfig.RSS_COORDINATOR_QUORUM)
.doc("Coordinator quorum"))
diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 7ff9467..1d69d9a 100644
--- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -34,6 +34,7 @@ import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
public class DelegationRssShuffleManager implements ShuffleManager {
@@ -95,25 +96,29 @@ public class DelegationRssShuffleManager implements ShuffleManager {
LOG.warn("Access id key is empty");
return false;
}
+ long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
+ int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);
for (CoordinatorClient coordinatorClient : coordinatorClients) {
+ Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
+ boolean canAccess;
try {
- Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
-
- RssAccessClusterResponse response =
- coordinatorClient.accessCluster(new RssAccessClusterRequest(
- accessId, assignmentTags, accessTimeoutMs));
- if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
- LOG.warn("Success to access cluster {} using {}", coordinatorClient.getDesc(), accessId);
- return true;
- } else if (response.getStatusCode() == ResponseStatusCode.ACCESS_DENIED) {
- LOG.warn("Request to access cluster {} is denied using {} for {}",
- coordinatorClient.getDesc(), accessId, response.getMessage());
- return false;
- } else {
- LOG.warn("Fail to reach cluster {} for {}", coordinatorClient.getDesc(), response.getMessage());
- }
- } catch (Exception e) {
+ canAccess = RetryUtils.retry(() -> {
+ RssAccessClusterResponse response = coordinatorClient.accessCluster(new RssAccessClusterRequest(
+ accessId, assignmentTags, accessTimeoutMs));
+ if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+ LOG.warn("Success to access cluster {} using {}", coordinatorClient.getDesc(), accessId);
+ return true;
+ } else if (response.getStatusCode() == ResponseStatusCode.ACCESS_DENIED) {
+ throw new RssException("Request to access cluster " + coordinatorClient.getDesc() + " is denied using "
+ + accessId + " for " + response.getMessage());
+ } else {
+ throw new RssException("Fail to reach cluster " + coordinatorClient.getDesc()
+ + " for " + response.getMessage());
+ }
+ }, retryInterval, retryTimes);
+ return canAccess;
+ } catch (Throwable e) {
LOG.warn("Fail to access cluster {} using {} for {}",
coordinatorClient.getDesc(), accessId, e.getMessage());
}
diff --git a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index 6f3d35b..dca71bb 100644
--- a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++ b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -19,8 +19,12 @@ package org.apache.spark.shuffle;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Set;
import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.uniffle.client.request.RssAccessClusterRequest;
+import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.storage.util.StorageType;
import org.apache.spark.SparkConf;
import org.apache.spark.shuffle.sort.SortShuffleManager;
@@ -32,6 +36,8 @@ import org.mockito.Mockito;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.RetryUtils;
import static org.apache.uniffle.client.response.ResponseStatusCode.ACCESS_DENIED;
import static org.apache.uniffle.client.response.ResponseStatusCode.SUCCESS;
@@ -134,6 +140,45 @@ public class DelegationRssShuffleManagerTest {
assertTrue(hasException);
}
+ @Test
+ public void testTryAccessCluster() throws Exception {
+ CoordinatorClient mockDeniedCoordinatorClient = mock(CoordinatorClient.class);
+ when(mockDeniedCoordinatorClient.accessCluster(any()))
+ .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+ .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+ .thenReturn(new RssAccessClusterResponse(SUCCESS, ""));
+ List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
+ coordinatorClients.add(mockDeniedCoordinatorClient);
+ mockedStaticRssShuffleUtils.when(() ->
+ RssSparkShuffleUtils.createCoordinatorClients(any())).thenReturn(coordinatorClients);
+ SparkConf conf = new SparkConf();
+ conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+ conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+ conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+ conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+ conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+ conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+ assertCreateRssShuffleManager(conf);
+
+ CoordinatorClient mockCoordinatorClient = mock(CoordinatorClient.class);
+ when(mockCoordinatorClient.accessCluster(any()))
+ .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+ .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+ .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""));
+ List<CoordinatorClient> SecondCoordinatorClients = Lists.newArrayList();
+ SecondCoordinatorClients.add(mockCoordinatorClient);
+ mockedStaticRssShuffleUtils.when(() ->
+ RssSparkShuffleUtils.createCoordinatorClients(any())).thenReturn(SecondCoordinatorClients);
+ SparkConf SecondConf = new SparkConf();
+ SecondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+ SecondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+ SecondConf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+ SecondConf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+ SecondConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+ SecondConf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+ assertCreateSortShuffleManager(SecondConf);
+ }
+
private DelegationRssShuffleManager assertCreateSortShuffleManager(SparkConf conf) throws Exception {
DelegationRssShuffleManager delegationRssShuffleManager = new DelegationRssShuffleManager(conf, true);
assertTrue(delegationRssShuffleManager.getDelegate() instanceof SortShuffleManager);
diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
index 33d8833..7c76e20 100644
--- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
+++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/DelegationRssShuffleManager.java
@@ -34,6 +34,7 @@ import org.apache.uniffle.client.response.ResponseStatusCode;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.Constants;
+import org.apache.uniffle.common.util.RetryUtils;
public class DelegationRssShuffleManager implements ShuffleManager {
@@ -95,25 +96,29 @@ public class DelegationRssShuffleManager implements ShuffleManager {
LOG.warn("Access id key is empty");
return false;
}
+ long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS);
+ int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES);
for (CoordinatorClient coordinatorClient : coordinatorClients) {
+ Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
+ boolean canAccess;
try {
- Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf);
-
- RssAccessClusterResponse response =
- coordinatorClient.accessCluster(new RssAccessClusterRequest(
- accessId, assignmentTags, accessTimeoutMs));
- if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
- LOG.warn("Success to access cluster {} using {}", coordinatorClient.getDesc(), accessId);
- return true;
- } else if (response.getStatusCode() == ResponseStatusCode.ACCESS_DENIED) {
- LOG.warn("Request to access cluster {} is denied using {} for {}",
- coordinatorClient.getDesc(), accessId, response.getMessage());
- return false;
- } else {
- LOG.warn("Fail to reach cluster {} for {}", coordinatorClient.getDesc(), response.getMessage());
- }
- } catch (Exception e) {
+ canAccess = RetryUtils.retry(() -> {
+ RssAccessClusterResponse response = coordinatorClient.accessCluster(new RssAccessClusterRequest(
+ accessId, assignmentTags, accessTimeoutMs));
+ if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
+ LOG.warn("Success to access cluster {} using {}", coordinatorClient.getDesc(), accessId);
+ return true;
+ } else if (response.getStatusCode() == ResponseStatusCode.ACCESS_DENIED) {
+ throw new RssException("Request to access cluster " + coordinatorClient.getDesc() + " is denied using "
+ + accessId + " for " + response.getMessage());
+ } else {
+ throw new RssException("Fail to reach cluster " + coordinatorClient.getDesc()
+ + " for " + response.getMessage());
+ }
+ }, retryInterval, retryTimes);
+ return canAccess;
+ } catch (Throwable e) {
LOG.warn("Fail to access cluster {} using {} for {}",
coordinatorClient.getDesc(), accessId, e.getMessage());
}
diff --git a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index 6382e0a..9682b4b 100644
--- a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++ b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -33,6 +33,8 @@ import org.mockito.Mockito;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.RetryUtils;
import static org.apache.uniffle.client.response.ResponseStatusCode.ACCESS_DENIED;
import static org.apache.uniffle.client.response.ResponseStatusCode.SUCCESS;
@@ -135,6 +137,45 @@ public class DelegationRssShuffleManagerTest {
assertTrue(hasException);
}
+ @Test
+ public void testTryAccessCluster() throws Exception {
+ CoordinatorClient mockDeniedCoordinatorClient = mock(CoordinatorClient.class);
+ when(mockDeniedCoordinatorClient.accessCluster(any()))
+ .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+ .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+ .thenReturn(new RssAccessClusterResponse(SUCCESS, ""));
+ List<CoordinatorClient> coordinatorClients = Lists.newArrayList();
+ coordinatorClients.add(mockDeniedCoordinatorClient);
+ mockedStaticRssShuffleUtils.when(() ->
+ RssSparkShuffleUtils.createCoordinatorClients(any())).thenReturn(coordinatorClients);
+ SparkConf conf = new SparkConf();
+ conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+ conf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+ conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+ conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+ conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+ conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+ assertCreateRssShuffleManager(conf);
+
+ CoordinatorClient mockCoordinatorClient = mock(CoordinatorClient.class);
+ when(mockCoordinatorClient.accessCluster(any()))
+ .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+ .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""))
+ .thenReturn(new RssAccessClusterResponse(ACCESS_DENIED, ""));
+ List<CoordinatorClient> SecondCoordinatorClients = Lists.newArrayList();
+ SecondCoordinatorClients.add(mockCoordinatorClient);
+ mockedStaticRssShuffleUtils.when(() ->
+ RssSparkShuffleUtils.createCoordinatorClients(any())).thenReturn(SecondCoordinatorClients);
+ SparkConf SecondConf = new SparkConf();
+ SecondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_INTERVAL_MS, 3000L);
+ SecondConf.set(RssSparkConfig.RSS_CLIENT_ACCESS_RETRY_TIMES, 3);
+ SecondConf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
+ SecondConf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
+ SecondConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
+ SecondConf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+ assertCreateSortShuffleManager(SecondConf);
+ }
+
private DelegationRssShuffleManager assertCreateSortShuffleManager(SparkConf conf) throws Exception {
DelegationRssShuffleManager delegationRssShuffleManager = new DelegationRssShuffleManager(conf, true);
assertTrue(delegationRssShuffleManager.getDelegate() instanceof SortShuffleManager);
diff --git a/docs/client_guide.md b/docs/client_guide.md
index b97474e..b4c1219 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -113,6 +113,8 @@ Other configuration:
|Property Name|Default|Description|
|---|---|---|
|spark.rss.access.timeout.ms|10000|The timeout to access Uniffle coordinator|
+|spark.rss.client.access.retry.interval.ms|20000|The interval between retries fallback to SortShuffleManager|
+|spark.rss.client.access.retry.times|0|The number of retries fallback to SortShuffleManager|
### Client Quorum Setting