You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2023/01/10 02:38:25 UTC

[incubator-uniffle] branch master updated: [ISSUE-456] Avoid removing resources for multiple times (#459)

This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f166f46 [ISSUE-456] Avoid removing resources for multiple times (#459)
3f166f46 is described below

commit 3f166f469a54d55c9abd5e3202a99dcd3f1440d9
Author: xianjingfeng <58...@qq.com>
AuthorDate: Tue Jan 10 10:38:20 2023 +0800

    [ISSUE-456] Avoid removing resources for multiple times (#459)
    
    ### What changes were proposed in this pull request?
    If `Resource` had been removed, avoid removing twice.
    
    ### Why are the changes needed?
    When some appIds' removeResource took too much time, the `expiredAppCleanupExecutorService` in ShuffleTaskManager would check and detect the same appId is expired multiple times. Therefore the
    same appId might be added to `expiredAppIdQueue` multiple times. This PR fixes #456
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
---
 .../apache/uniffle/server/ShuffleTaskManager.java  |  8 ++-
 .../uniffle/server/ShuffleTaskManagerTest.java     | 57 +++++++++++++++++++++-
 2 files changed, 62 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index c753d941..cb9e8e3f 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -534,7 +534,12 @@ public class ShuffleTaskManager {
   public void removeResources(String appId) {
     LOG.info("Start remove resource for appId[" + appId + "]");
     final long start = System.currentTimeMillis();
-    final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds = shuffleTaskInfos.get(appId).getCachedBlockIds();
+    ShuffleTaskInfo shffleTaskInfo = shuffleTaskInfos.remove(appId);
+    if (shffleTaskInfo == null) {
+      LOG.info("Resource for appId[" + appId + "] had been removed before.");
+      return;
+    }
+    final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds = shffleTaskInfo.getCachedBlockIds();
     partitionsToBlockIds.remove(appId);
     shuffleBufferManager.removeBuffer(appId);
     shuffleFlushManager.removeResources(appId);
@@ -543,7 +548,6 @@ public class ShuffleTaskManager {
           new AppPurgeEvent(appId, getUserByAppId(appId), new ArrayList<>(shuffleToCachedBlockIds.keySet()))
       );
     }
-    shuffleTaskInfos.remove(appId);
     LOG.info("Finish remove resource for appId[" + appId + "] cost " + (System.currentTimeMillis() - start) + " ms");
   }
 
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 16344736..1dc6eb58 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
@@ -34,6 +36,7 @@ import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -512,6 +515,57 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     assertTrue(shuffleTaskManager.getCachedBlockIds("clearTest1", shuffleId).isEmpty());
   }
 
+
+  @Test
+  public void clearMultiTimesTest() throws Exception {
+    ShuffleServerConf conf = new ShuffleServerConf();
+    String storageBasePath = HDFS_URI + "rss/clearTest";
+    final int shuffleId = 1;
+    conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+    conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+    conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
+    conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
+    conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
+    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
+    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
+    conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
+    conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
+    conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+    conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
+    conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
+    conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
+
+    ShuffleServer shuffleServer = new ShuffleServer(conf);
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+    String appId = "clearMultiTimesTest";
+    shuffleTaskManager.registerShuffle(
+        appId,
+        shuffleId,
+        Lists.newArrayList(new PartitionRange(0, 1)),
+        RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+        StringUtils.EMPTY
+    );
+    shuffleTaskManager.refreshAppId(appId);
+    assertEquals(1, shuffleTaskManager.getAppIds().size());
+    
+    shuffleTaskManager.checkResourceStatus();
+    assertEquals(Sets.newHashSet(appId), shuffleTaskManager.getAppIds());
+
+    CountDownLatch countDownLatch = new CountDownLatch(3);
+    for (int i = 0; i < 3; i++) {
+      new Thread(() -> {
+        try {
+          shuffleTaskManager.removeResources(appId);
+        } finally {
+          countDownLatch.countDown();
+        }
+      }).start();
+    }
+    countDownLatch.await();
+    assertEquals(Sets.newHashSet(), shuffleTaskManager.getAppIds());
+    assertTrue(shuffleTaskManager.getCachedBlockIds(appId, shuffleId).isEmpty());
+  }
+
   @Test
   public void getBlockIdsByPartitionIdTest() {
     ShuffleServerConf conf = new ShuffleServerConf();
@@ -753,7 +807,8 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     assertTrue(appIdsOnDisk.contains(appId));
 
     // make sure heartbeat timeout and resources are removed
-    Thread.sleep(5000);
+    Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+        () -> shuffleTaskManager.getAppIds().size() == 0);
 
     // Create the hidden dir to simulate LocalStorageChecker's check
     String storageDir = tempDir.getAbsolutePath();