You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2023/01/10 02:38:25 UTC
[incubator-uniffle] branch master updated: [ISSUE-456] Avoid removing resources for multiple times (#459)
This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3f166f46 [ISSUE-456] Avoid removing resources for multiple times (#459)
3f166f46 is described below
commit 3f166f469a54d55c9abd5e3202a99dcd3f1440d9
Author: xianjingfeng <58...@qq.com>
AuthorDate: Tue Jan 10 10:38:20 2023 +0800
[ISSUE-456] Avoid removing resources for multiple times (#459)
### What changes were proposed in this pull request?
If `Resource` had been removed, avoid removing twice.
### Why are the changes needed?
When some appIds' removeResource took too much time, the `expiredAppCleanupExecutorService` in ShuffleTaskManager would check and detect the same appId is expired multiple times. Therefore the
same appId might be added to `expiredAppIdQueue` multiple times. This PR fixes #456
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
---
.../apache/uniffle/server/ShuffleTaskManager.java | 8 ++-
.../uniffle/server/ShuffleTaskManagerTest.java | 57 +++++++++++++++++++++-
2 files changed, 62 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index c753d941..cb9e8e3f 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -534,7 +534,12 @@ public class ShuffleTaskManager {
public void removeResources(String appId) {
LOG.info("Start remove resource for appId[" + appId + "]");
final long start = System.currentTimeMillis();
- final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds = shuffleTaskInfos.get(appId).getCachedBlockIds();
+ ShuffleTaskInfo shffleTaskInfo = shuffleTaskInfos.remove(appId);
+ if (shffleTaskInfo == null) {
+ LOG.info("Resource for appId[" + appId + "] had been removed before.");
+ return;
+ }
+ final Map<Integer, Roaring64NavigableMap> shuffleToCachedBlockIds = shffleTaskInfo.getCachedBlockIds();
partitionsToBlockIds.remove(appId);
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
@@ -543,7 +548,6 @@ public class ShuffleTaskManager {
new AppPurgeEvent(appId, getUserByAppId(appId), new ArrayList<>(shuffleToCachedBlockIds.keySet()))
);
}
- shuffleTaskInfos.remove(appId);
LOG.info("Finish remove resource for appId[" + appId + "] cost " + (System.currentTimeMillis() - start) + " ms");
}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 16344736..1dc6eb58 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
@@ -34,6 +36,7 @@ import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -512,6 +515,57 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
assertTrue(shuffleTaskManager.getCachedBlockIds("clearTest1", shuffleId).isEmpty());
}
+
+ @Test
+ public void clearMultiTimesTest() throws Exception {
+ ShuffleServerConf conf = new ShuffleServerConf();
+ String storageBasePath = HDFS_URI + "rss/clearTest";
+ final int shuffleId = 1;
+ conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+ conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+ conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
+ conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
+ conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
+ conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 50.0);
+ conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 0.0);
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(storageBasePath));
+ conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
+ conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+ conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
+ conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
+ conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
+
+ ShuffleServer shuffleServer = new ShuffleServer(conf);
+ ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+ String appId = "clearMultiTimesTest";
+ shuffleTaskManager.registerShuffle(
+ appId,
+ shuffleId,
+ Lists.newArrayList(new PartitionRange(0, 1)),
+ RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+ StringUtils.EMPTY
+ );
+ shuffleTaskManager.refreshAppId(appId);
+ assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+ shuffleTaskManager.checkResourceStatus();
+ assertEquals(Sets.newHashSet(appId), shuffleTaskManager.getAppIds());
+
+ CountDownLatch countDownLatch = new CountDownLatch(3);
+ for (int i = 0; i < 3; i++) {
+ new Thread(() -> {
+ try {
+ shuffleTaskManager.removeResources(appId);
+ } finally {
+ countDownLatch.countDown();
+ }
+ }).start();
+ }
+ countDownLatch.await();
+ assertEquals(Sets.newHashSet(), shuffleTaskManager.getAppIds());
+ assertTrue(shuffleTaskManager.getCachedBlockIds(appId, shuffleId).isEmpty());
+ }
+
@Test
public void getBlockIdsByPartitionIdTest() {
ShuffleServerConf conf = new ShuffleServerConf();
@@ -753,7 +807,8 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
assertTrue(appIdsOnDisk.contains(appId));
// make sure heartbeat timeout and resources are removed
- Thread.sleep(5000);
+ Awaitility.await().timeout(10, TimeUnit.SECONDS).until(
+ () -> shuffleTaskManager.getAppIds().size() == 0);
// Create the hidden dir to simulate LocalStorageChecker's check
String storageDir = tempDir.getAbsolutePath();