You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2023/05/29 14:59:13 UTC

[incubator-uniffle] branch master updated: [#881] fix: Ensure LocalStorageMeta disk size is correctly updated when events are processed (#902)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d25218d [#881] fix: Ensure LocalStorageMeta disk size is correctly updated when events are processed (#902)
7d25218d is described below

commit 7d25218dcde9c05a2f3043f15ca2614f74428b5b
Author: awdavidson <54...@users.noreply.github.com>
AuthorDate: Mon May 29 15:59:07 2023 +0100

    [#881] fix: Ensure LocalStorageMeta disk size is correctly updated when events are processed (#902)
    
    ### What changes were proposed in this pull request?
    Ensure all events are marked as understorage, this will result to the LocalStorageMeta being updated when events are processed.
    
    ### Why are the changes needed?
    Currently LocalStorageMeta is only update with metrics from the first event in a given shuffleId and partitionId, the first event updates metrics because there is no entry in `partitionsOfStorage` and the event get marked as `underStorage`, however, for future events in the same shuffleId and partitionId `selectStorage` returns the storage and does not mark the event as `underStorage` so when `updateWriteMetrics` is called, `event.getUnderStorage()` returns null and `storage.updateWri [...]
    
    As metrics are not updated correctly, `LocalStorage.canWrite` will not return the correct result.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    Added unit test which covers multi events for the same shuffleId and partitionId
---
 .../server/storage/LocalStorageManager.java        |  5 ++-
 .../uniffle/server/ShuffleFlushManagerTest.java    | 36 ++++++++++++++++++++++
 2 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 4080eba0..c1d423cd 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -181,6 +181,9 @@ public class LocalStorageManager extends SingleStorageManager {
               storage.getBasePath(), event);
         }
       } else {
+        if (event.getUnderStorage() == null) {
+          event.setUnderStorage(storage);
+        }
         return storage;
       }
     }
@@ -206,7 +209,7 @@ public class LocalStorageManager extends SingleStorageManager {
         (key, localStorage) -> {
           // If this is the first time to select storage or existing storage is corrupted,
           // we should refresh the cache.
-          if (localStorage == null || localStorage.isCorrupted()) {
+          if (localStorage == null || localStorage.isCorrupted() || event.getUnderStorage() == null) {
             event.setUnderStorage(selectedStorage);
             return selectedStorage;
           }
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 4817b812..7de4c653 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -56,6 +56,7 @@ import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.server.event.AppPurgeEvent;
 import org.apache.uniffle.server.storage.HadoopStorageManager;
+import org.apache.uniffle.server.storage.LocalStorageManager;
 import org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy;
 import org.apache.uniffle.server.storage.MultiStorageManager;
 import org.apache.uniffle.server.storage.StorageManager;
@@ -69,6 +70,7 @@ import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -233,6 +235,34 @@ public class ShuffleFlushManagerTest extends HadoopTestBase {
     waitForMetrics(ShuffleServerMetrics.gaugeWriteHandler, 0, 0.5);
   }
 
+  @Test
+  public void localMetricsTest(@TempDir File tempDir) throws Exception {
+    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Arrays.asList(tempDir.getAbsolutePath()));
+    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name());
+
+    String appId = "localMetricsTest_appId";
+    StorageManager storageManager =
+            StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+    ShuffleFlushManager manager =
+            new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, storageManager);
+    ShuffleDataFlushEvent event1 =
+            createShuffleDataFlushEvent(appId, 1, 1, 1, null);
+    manager.addToFlushQueue(event1);
+    // wait for write data
+    waitForFlush(manager, appId, 1, 5);
+
+    validateLocalMetadata(storageManager, 160L);
+
+    ShuffleDataFlushEvent event12 =
+            createShuffleDataFlushEvent(appId, 1, 1, 1, null);
+    manager.addToFlushQueue(event12);
+
+    // wait for write data
+    waitForFlush(manager, appId, 1, 10);
+
+    validateLocalMetadata(storageManager, 320L);
+  }
+
   @Test
   public void complexWriteTest() throws Exception {
     shuffleServerConf.setString("rss.server.flush.handler.expired", "3");
@@ -587,4 +617,10 @@ public class ShuffleFlushManagerTest extends HadoopTestBase {
     assertEquals(eventNum + 3, (int) ShuffleServerMetrics.counterTotalDroppedEventNum.get());
     assertEquals(0, manager.getPendingEventsSize());
   }
+
+  private void validateLocalMetadata(StorageManager storageManager, Long size) {
+    assertInstanceOf(LocalStorageManager.class, storageManager);
+    LocalStorage localStorage = ((LocalStorageManager) storageManager).getStorages().get(0);
+    assertEquals(size, localStorage.getMetaData().getDiskSize().longValue());
+  }
 }