You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2022/11/18 01:41:50 UTC

[incubator-uniffle] branch master updated: [ISSUE-328] Cleanup unused shuffle servers to avoid app heartbeat after stage completed (#334)

This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new b98fcf2c [ISSUE-328] Cleanup unused shuffle servers to avoid app heartbeat after stage completed (#334)
b98fcf2c is described below

commit b98fcf2c7e897898c36c7184ca7e624e1eda60a0
Author: xianjingfeng <58...@qq.com>
AuthorDate: Fri Nov 18 09:41:45 2022 +0800

    [ISSUE-328] Cleanup unused shuffle servers to avoid app heartbeat after stage completed (#334)
    
    ### What changes were proposed in this pull request?
    
    Cleanup unused shuffle servers after stage completed
    
    ### Why are the changes needed?
    
    If there are many stages in one applicaiton, spark driver will send heartbeat to every shuffle servers, and it may cause app expired in shuffle server side. And if we support decommission in the future, it will cause shuffle difficult to exit. #328
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added
---
 .../client/impl/ShuffleWriteClientImpl.java        | 57 ++++++++++++++++++++--
 .../client/impl/ShuffleWriteClientImplTest.java    | 20 ++++++++
 2 files changed, 72 insertions(+), 5 deletions(-)

diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index e44b9cd1..3bccd467 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.client.impl;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -86,7 +87,8 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
   private int retryMax;
   private long retryIntervalMax;
   private List<CoordinatorClient> coordinatorClients = Lists.newLinkedList();
-  private Set<ShuffleServerInfo> shuffleServerInfoSet = Sets.newConcurrentHashSet();
+  //appId -> shuffleId -> servers
+  private Map<String, Map<Integer, Set<ShuffleServerInfo>>> shuffleServerInfoMap = Maps.newConcurrentMap();
   private CoordinatorClientFactory coordinatorClientFactory;
   private ExecutorService heartBeatExecutorService;
   private int replica;
@@ -350,7 +352,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
     String msg = "Error happened when registerShuffle with appId[" + appId + "], shuffleId[" + shuffleId
         + "], " + shuffleServerInfo;
     throwExceptionIfNecessary(response, msg);
-    shuffleServerInfoSet.add(shuffleServerInfo);
+    addShuffleServer(appId, shuffleId, shuffleServerInfo);
   }
 
   @Override
@@ -551,7 +553,8 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
   public void sendAppHeartbeat(String appId, long timeoutMs) {
     RssAppHeartBeatRequest request = new RssAppHeartBeatRequest(appId, timeoutMs);
     List<Callable<Void>> callableList = Lists.newArrayList();
-    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+    Set<ShuffleServerInfo> allShuffleServers = getAllShuffleServers(appId);
+    allShuffleServers.forEach(shuffleServerInfo -> {
           callableList.add(() -> {
             try {
               ShuffleServerClient client =
@@ -607,7 +610,16 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
     RssUnregisterShuffleRequest request = new RssUnregisterShuffleRequest(appId, shuffleId);
     List<Callable<Void>> callableList = Lists.newArrayList();
 
-    shuffleServerInfoSet.stream().forEach(shuffleServerInfo -> {
+    Map<Integer, Set<ShuffleServerInfo>> appServerMap = shuffleServerInfoMap.get(appId);
+    if (appServerMap == null) {
+      return;
+    }
+    Set<ShuffleServerInfo> shuffleServerInfos = appServerMap.get(shuffleId);
+    if (shuffleServerInfos == null) {
+      return;
+    }
+
+    shuffleServerInfos.forEach(shuffleServerInfo -> {
           callableList.add(() -> {
             try {
               ShuffleServerClient client =
@@ -628,7 +640,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
     try {
       executorService =
           Executors.newFixedThreadPool(
-              Math.min(unregisterThreadPoolSize, shuffleServerInfoSet.size()),
+              Math.min(unregisterThreadPoolSize, shuffleServerInfos.size()),
               ThreadUtils.getThreadFactory("unregister-shuffle-%d")
           );
       List<Future<Void>> futures = executorService.invokeAll(callableList, unregisterRequestTimeSec, TimeUnit.SECONDS);
@@ -643,6 +655,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
       if (executorService != null) {
         executorService.shutdownNow();
       }
+      removeShuffleServer(appId, shuffleId);
     }
   }
 
@@ -653,9 +666,43 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
     }
   }
 
+  Set<ShuffleServerInfo> getAllShuffleServers(String appId) {
+    Map<Integer, Set<ShuffleServerInfo>> appServerMap = shuffleServerInfoMap.get(appId);
+    if (appServerMap == null) {
+      return Collections.EMPTY_SET;
+    }
+    Set<ShuffleServerInfo> serverInfos = Sets.newHashSet();
+    appServerMap.values().forEach((serverSet) -> {
+      serverInfos.addAll(serverSet);
+    });
+    return serverInfos;
+  }
+
   @VisibleForTesting
   public ShuffleServerClient getShuffleServerClient(ShuffleServerInfo shuffleServerInfo) {
     return ShuffleServerClientFactory.getInstance().getShuffleServerClient(clientType, shuffleServerInfo);
   }
 
+  @VisibleForTesting
+  void addShuffleServer(String appId, int shuffleId, ShuffleServerInfo serverInfo) {
+    Map<Integer, Set<ShuffleServerInfo>> appServerMap = shuffleServerInfoMap.get(appId);
+    if (appServerMap == null) {
+      appServerMap = Maps.newConcurrentMap();
+      shuffleServerInfoMap.put(appId, appServerMap);
+    }
+    Set<ShuffleServerInfo> shuffleServerInfos = appServerMap.get(shuffleId);
+    if (shuffleServerInfos == null) {
+      shuffleServerInfos = Sets.newConcurrentHashSet();
+      appServerMap.put(shuffleId, shuffleServerInfos);
+    }
+    shuffleServerInfos.add(serverInfo);
+  }
+
+  @VisibleForTesting
+  void removeShuffleServer(String appId, int shuffleId) {
+    Map<Integer, Set<ShuffleServerInfo>> appServerMap = shuffleServerInfoMap.get(appId);
+    if (appServerMap != null) {
+      appServerMap.remove(shuffleId);
+    }
+  }
 }
diff --git a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
index 71fdc637..414d203f 100644
--- a/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
+++ b/client/src/test/java/org/apache/uniffle/client/impl/ShuffleWriteClientImplTest.java
@@ -30,6 +30,7 @@ import org.apache.uniffle.client.response.SendShuffleDataResult;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleServerInfo;
 
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doReturn;
@@ -56,4 +57,23 @@ public class ShuffleWriteClientImplTest {
 
     assertTrue(result.getFailedBlockIds().contains(10L));
   }
+
+  @Test
+  public void testRegisterAndUnRegisterShuffleServer() {
+    ShuffleWriteClientImpl shuffleWriteClient =
+        new ShuffleWriteClientImpl("GRPC", 3, 2000, 4, 1, 1, 1, true, 1, 1, 10, 10);
+    String appId1 = "testRegisterAndUnRegisterShuffleServer-1";
+    String appId2 = "testRegisterAndUnRegisterShuffleServer-2";
+    ShuffleServerInfo server1 = new ShuffleServerInfo("host1-0", "host1", 0);
+    ShuffleServerInfo server2 = new ShuffleServerInfo("host2-0", "host2", 0);
+    ShuffleServerInfo server3 = new ShuffleServerInfo("host3-0", "host3", 0);
+    shuffleWriteClient.addShuffleServer(appId1, 0, server1);
+    shuffleWriteClient.addShuffleServer(appId1, 1, server2);
+    shuffleWriteClient.addShuffleServer(appId2, 1, server3);
+    assertEquals(2, shuffleWriteClient.getAllShuffleServers(appId1).size());
+    assertEquals(1, shuffleWriteClient.getAllShuffleServers(appId2).size());
+    shuffleWriteClient.addShuffleServer(appId1, 1, server1);
+    shuffleWriteClient.unregisterShuffle(appId1, 1);
+    assertEquals(1, shuffleWriteClient.getAllShuffleServers(appId1).size());
+  }
 }