You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/05/29 14:59:13 UTC
[incubator-uniffle] branch master updated: [#881] fix: Ensure LocalStorageMeta disk size is correctly updated when events are processed (#902)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 7d25218d [#881] fix: Ensure LocalStorageMeta disk size is correctly updated when events are processed (#902)
7d25218d is described below
commit 7d25218dcde9c05a2f3043f15ca2614f74428b5b
Author: awdavidson <54...@users.noreply.github.com>
AuthorDate: Mon May 29 15:59:07 2023 +0100
[#881] fix: Ensure LocalStorageMeta disk size is correctly updated when events are processed (#902)
### What changes were proposed in this pull request?
Ensure all events are marked as understorage, this will result to the LocalStorageMeta being updated when events are processed.
### Why are the changes needed?
Currently LocalStorageMeta is only update with metrics from the first event in a given shuffleId and partitionId, the first event updates metrics because there is no entry in `partitionsOfStorage` and the event get marked as `underStorage`, however, for future events in the same shuffleId and partitionId `selectStorage` returns the storage and does not mark the event as `underStorage` so when `updateWriteMetrics` is called, `event.getUnderStorage()` returns null and `storage.updateWri [...]
As metrics are not updated correctly, `LocalStorage.canWrite` will not return the correct result.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit test which covers multi events for the same shuffleId and partitionId
---
.../server/storage/LocalStorageManager.java | 5 ++-
.../uniffle/server/ShuffleFlushManagerTest.java | 36 ++++++++++++++++++++++
2 files changed, 40 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 4080eba0..c1d423cd 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -181,6 +181,9 @@ public class LocalStorageManager extends SingleStorageManager {
storage.getBasePath(), event);
}
} else {
+ if (event.getUnderStorage() == null) {
+ event.setUnderStorage(storage);
+ }
return storage;
}
}
@@ -206,7 +209,7 @@ public class LocalStorageManager extends SingleStorageManager {
(key, localStorage) -> {
// If this is the first time to select storage or existing storage is corrupted,
// we should refresh the cache.
- if (localStorage == null || localStorage.isCorrupted()) {
+ if (localStorage == null || localStorage.isCorrupted() || event.getUnderStorage() == null) {
event.setUnderStorage(selectedStorage);
return selectedStorage;
}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 4817b812..7de4c653 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -56,6 +56,7 @@ import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.event.AppPurgeEvent;
import org.apache.uniffle.server.storage.HadoopStorageManager;
+import org.apache.uniffle.server.storage.LocalStorageManager;
import org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy;
import org.apache.uniffle.server.storage.MultiStorageManager;
import org.apache.uniffle.server.storage.StorageManager;
@@ -69,6 +70,7 @@ import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -233,6 +235,34 @@ public class ShuffleFlushManagerTest extends HadoopTestBase {
waitForMetrics(ShuffleServerMetrics.gaugeWriteHandler, 0, 0.5);
}
+ @Test
+ public void localMetricsTest(@TempDir File tempDir) throws Exception {
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name());
+
+ String appId = "localMetricsTest_appId";
+ StorageManager storageManager =
+ StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+ ShuffleFlushManager manager =
+ new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, storageManager);
+ ShuffleDataFlushEvent event1 =
+ createShuffleDataFlushEvent(appId, 1, 1, 1, null);
+ manager.addToFlushQueue(event1);
+ // wait for write data
+ waitForFlush(manager, appId, 1, 5);
+
+ validateLocalMetadata(storageManager, 160L);
+
+ ShuffleDataFlushEvent event12 =
+ createShuffleDataFlushEvent(appId, 1, 1, 1, null);
+ manager.addToFlushQueue(event12);
+
+ // wait for write data
+ waitForFlush(manager, appId, 1, 10);
+
+ validateLocalMetadata(storageManager, 320L);
+ }
+
@Test
public void complexWriteTest() throws Exception {
shuffleServerConf.setString("rss.server.flush.handler.expired", "3");
@@ -587,4 +617,10 @@ public class ShuffleFlushManagerTest extends HadoopTestBase {
assertEquals(eventNum + 3, (int) ShuffleServerMetrics.counterTotalDroppedEventNum.get());
assertEquals(0, manager.getPendingEventsSize());
}
+
+ private void validateLocalMetadata(StorageManager storageManager, Long size) {
+ assertInstanceOf(LocalStorageManager.class, storageManager);
+ LocalStorage localStorage = ((LocalStorageManager) storageManager).getStorages().get(0);
+ assertEquals(size, localStorage.getMetaData().getDiskSize().longValue());
+ }
}