You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2024/02/04 08:54:24 UTC
(incubator-uniffle) branch master updated: [#1501] fix(server): storage selection cache accidentally deleted when clearing stage level data. (#1505)
This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 576a925a2 [#1501] fix(server): storage selection cache accidentally deleted when clearing stage level data. (#1505)
576a925a2 is described below
commit 576a925a21576467ab8713ca1d4b569d0a0af5c8
Author: dingshun3016 <di...@163.com>
AuthorDate: Sun Feb 4 16:54:18 2024 +0800
[#1501] fix(server): storage selection cache accidentally deleted when clearing stage level data. (#1505)
### What changes were proposed in this pull request?
avoid storage selection cache accidentally deleted
### Why are the changes needed?
Fix: (#1501)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests
---
.../uniffle/server/storage/LocalStorageManager.java | 4 ++--
.../apache/uniffle/server/ShuffleFlushManagerTest.java | 17 ++++++++++++++++-
2 files changed, 18 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 229b309a1..772aae1d0 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -309,12 +309,12 @@ public class LocalStorageManager extends SingleStorageManager {
Function<String, Boolean> deleteConditionFunc = null;
String prefixKey = null;
if (event instanceof AppPurgeEvent) {
- prefixKey = UnionKey.buildKey(event.getAppId());
+ prefixKey = UnionKey.buildKey(event.getAppId(), "");
deleteConditionFunc =
partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId());
} else if (event instanceof ShufflePurgeEvent) {
int shuffleId = event.getShuffleIds().get(0);
- prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId);
+ prefixKey = UnionKey.buildKey(event.getAppId(), shuffleId, "");
deleteConditionFunc =
partitionUnionKey -> UnionKey.startsWith(partitionUnionKey, event.getAppId(), shuffleId);
}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index f2b0b3048..c17087073 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -72,6 +72,7 @@ import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -454,7 +455,7 @@ public class ShuffleFlushManagerTest extends HadoopTestBase {
@Test
public void clearLocalTest(@TempDir File tempDir) throws Exception {
final String appId1 = "clearLocalTest_appId1";
- final String appId2 = "clearLocalTest_appId2";
+ final String appId2 = "clearLocalTest_appId12";
ShuffleServerConf serverConf = new ShuffleServerConf();
serverConf.set(
ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
@@ -470,14 +471,18 @@ public class ShuffleFlushManagerTest extends HadoopTestBase {
manager.addToFlushQueue(event2);
ShuffleDataFlushEvent event3 = createShuffleDataFlushEvent(appId2, 2, 0, 1, null);
manager.addToFlushQueue(event3);
+ ShuffleDataFlushEvent event5 = createShuffleDataFlushEvent(appId2, 11, 0, 1, null);
+ manager.addToFlushQueue(event5);
assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2));
final AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
waitForFlush(manager, appId1, 1, 5);
waitForFlush(manager, appId2, 1, 5);
waitForFlush(manager, appId2, 2, 5);
+ waitForFlush(manager, appId2, 11, 5);
assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality());
+ assertEquals(5, manager.getCommittedBlockIds(appId2, 11).getLongCardinality());
assertEquals(2, storage.getHandlerSize());
File file = new File(tempDir, appId1);
assertTrue(file.exists());
@@ -490,6 +495,10 @@ public class ShuffleFlushManagerTest extends HadoopTestBase {
new AppPurgeEvent(appId1, StringUtils.EMPTY, Lists.newArrayList(1)));
manager.removeResources(appId1);
assertFalse(file.exists());
+
+ ShuffleDataReadEvent shuffleReadEvent = new ShuffleDataReadEvent(appId2, 1, 0, 0);
+ assertNotNull(storageManager.selectStorage(shuffleReadEvent));
+
assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 2).getLongCardinality());
@@ -497,6 +506,12 @@ public class ShuffleFlushManagerTest extends HadoopTestBase {
manager.removeResources(appId2);
storageManager.removeResources(
new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(1)));
+
+ ShuffleDataReadEvent shuffle1ReadEvent = new ShuffleDataReadEvent(appId2, 1, 0, 0);
+ ShuffleDataReadEvent shuffle11ReadEvent = new ShuffleDataReadEvent(appId2, 11, 0, 0);
+ assertNull(storageManager.selectStorage(shuffle1ReadEvent));
+ assertNotNull(storageManager.selectStorage(shuffle11ReadEvent));
+
storageManager.removeResources(
new ShufflePurgeEvent(appId2, StringUtils.EMPTY, Lists.newArrayList(2)));
storageManager.removeResources(