You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2023/01/06 15:11:49 UTC

[incubator-uniffle] branch master updated: [ISSUE-378][HugePartition][Part-1] Record every partition data size for one app (#458)

This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new ebaff6aa [ISSUE-378][HugePartition][Part-1] Record every partition data size for one app (#458)
ebaff6aa is described below

commit ebaff6aa4eccfe4f54a967bc271da7e345f6c34a
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Fri Jan 6 23:11:43 2023 +0800

    [ISSUE-378][HugePartition][Part-1] Record every partition data size for one app (#458)
    
    
    ### What changes were proposed in this pull request?
    
    Record every partition data size for one app
    
    ### Why are the changes needed?
    
    This is a subtask for #378
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    1. UTs
---
 .../uniffle/server/ShuffleServerGrpcService.java   |  2 +-
 .../org/apache/uniffle/server/ShuffleTaskInfo.java | 33 +++++++++++++++
 .../apache/uniffle/server/ShuffleTaskManager.java  | 18 ++++++++
 .../apache/uniffle/server/ShuffleTaskInfoTest.java | 48 ++++++++++++++++++++++
 .../uniffle/server/ShuffleTaskManagerTest.java     | 43 +++++++++++++++++++
 5 files changed, 143 insertions(+), 1 deletion(-)

diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 97cd41f9..1e6999e3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -249,7 +249,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase {
             // after each cacheShuffleData call, the `preAllocatedSize` is updated timely.
             manager.releasePreAllocatedSize(toReleasedSize);
             alreadyReleasedSize += toReleasedSize;
-            manager.updateCachedBlockIds(appId, shuffleId, spd.getBlockList());
+            manager.updateCachedBlockIds(appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
           }
         } catch (Exception e) {
           String errorMsg = "Error happened when shuffleEngine.write for "
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index aeb00887..46becf95 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server;
 
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.Maps;
@@ -46,6 +47,12 @@ public class ShuffleTaskInfo {
 
   private AtomicReference<ShuffleDataDistributionType> dataDistType;
 
+  private AtomicLong totalDataSize = new AtomicLong(0);
+  /**
+   * shuffleId -> partitionId -> partition shuffle data size
+   */
+  private Map<Integer, Map<Integer, Long>> partitionDataSizes;
+
   public ShuffleTaskInfo() {
     this.currentTimes = System.currentTimeMillis();
     this.commitCounts = Maps.newConcurrentMap();
@@ -53,6 +60,7 @@ public class ShuffleTaskInfo {
     this.cachedBlockIds = Maps.newConcurrentMap();
     this.user = new AtomicReference<>();
     this.dataDistType = new AtomicReference<>();
+    this.partitionDataSizes = Maps.newConcurrentMap();
   }
 
   public Long getCurrentTimes() {
@@ -91,4 +99,29 @@ public class ShuffleTaskInfo {
   public ShuffleDataDistributionType getDataDistType() {
     return dataDistType.get();
   }
+
+  public void addPartitionDataSize(int shuffleId, int partitionId, long delta) {
+    totalDataSize.addAndGet(delta);
+    partitionDataSizes.computeIfAbsent(shuffleId, key -> Maps.newConcurrentMap());
+    Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
+    partitions.putIfAbsent(partitionId, 0L);
+    partitions.computeIfPresent(partitionId, (k, v) -> v + delta);
+  }
+
+  public long getTotalDataSize() {
+    return totalDataSize.get();
+  }
+
+  public long getPartitionDataSize(int shuffleId, int partitionId) {
+    Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
+    if (partitions == null) {
+      return 0;
+    }
+    Long size = partitions.get(partitionId);
+    if (size == null) {
+      return 0L;
+    }
+    return size;
+  }
+
 }
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index ad93ac53..c753d941 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -297,18 +297,31 @@ public class ShuffleTaskManager {
     return commitNum.incrementAndGet();
   }
 
+  // Only for tests
   public void updateCachedBlockIds(String appId, int shuffleId, ShufflePartitionedBlock[] spbs) {
+    updateCachedBlockIds(appId, shuffleId, 0, spbs);
+  }
+
+  public void updateCachedBlockIds(String appId, int shuffleId, int partitionId, ShufflePartitionedBlock[] spbs) {
     if (spbs == null || spbs.length == 0) {
       return;
     }
     ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo());
     Roaring64NavigableMap bitmap = shuffleTaskInfo.getCachedBlockIds()
         .computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf());
+
+    long size = 0L;
     synchronized (bitmap) {
       for (ShufflePartitionedBlock spb : spbs) {
         bitmap.addLong(spb.getBlockId());
+        size += spb.getSize();
       }
     }
+    shuffleTaskInfo.addPartitionDataSize(
+        shuffleId,
+        partitionId,
+        size
+    );
   }
 
   public Roaring64NavigableMap getCachedBlockIds(String appId, int shuffleId) {
@@ -603,6 +616,11 @@ public class ShuffleTaskManager {
     return shuffleTaskInfos.get(appId).getDataDistType();
   }
 
+  @VisibleForTesting
+  public ShuffleTaskInfo getShuffleTaskInfo(String appId) {
+    return shuffleTaskInfos.get(appId);
+  }
+
   private void triggerFlush() {
     synchronized (this.shuffleBufferManager) {
       this.shuffleBufferManager.flushIfNecessary();
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskInfoTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskInfoTest.java
new file mode 100644
index 00000000..4c407a6e
--- /dev/null
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskInfoTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ShuffleTaskInfoTest {
+
+  @Test
+  public void partitionSizeSummaryTest() {
+    ShuffleTaskInfo shuffleTaskInfo = new ShuffleTaskInfo();
+    // case1
+    long size = shuffleTaskInfo.getPartitionDataSize(0, 0);
+    assertEquals(0, size);
+
+    // case2
+    shuffleTaskInfo.addPartitionDataSize(0, 0, 1000);
+    size = shuffleTaskInfo.getPartitionDataSize(0, 0);
+    assertEquals(1000, size);
+
+    // case3
+    shuffleTaskInfo.addPartitionDataSize(0, 0, 500);
+    size = shuffleTaskInfo.getPartitionDataSize(0, 0);
+    assertEquals(1500, size);
+
+    assertEquals(
+        1500,
+        shuffleTaskInfo.getTotalDataSize()
+    );
+  }
+}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 86a8dcdf..16344736 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -75,6 +75,49 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
     ShuffleServerMetrics.clear();
   }
 
+  @Test
+  public void partitionDataSizeSummaryTest() throws Exception {
+    String confFile = ClassLoader.getSystemResource("server.conf").getFile();
+    ShuffleServerConf conf = new ShuffleServerConf(confFile);
+    conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name());
+    ShuffleServer shuffleServer = new ShuffleServer(conf);
+    ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+
+    String appId = "partitionDataSizeSummaryTest";
+    int shuffleId = 1;
+
+    shuffleTaskManager.registerShuffle(
+        appId,
+        shuffleId,
+        Lists.newArrayList(new PartitionRange(0, 1)),
+        RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+        StringUtils.EMPTY
+    );
+
+    // case1
+    ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
+    long size1 = partitionedData0.getTotalBlockSize();
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList());
+
+    assertEquals(
+        size1,
+        shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize()
+    );
+
+    // case2
+    partitionedData0 = createPartitionedData(1, 1, 35);
+    long size2 = partitionedData0.getTotalBlockSize();
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList());
+    assertEquals(
+        size1 + size2,
+        shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize()
+    );
+    assertEquals(
+        size1 + size2,
+        shuffleTaskManager.getShuffleTaskInfo(appId).getPartitionDataSize(1, 1)
+    );
+  }
+
   @Test
   public void registerShuffleTest() throws Exception {
     String confFile = ClassLoader.getSystemResource("server.conf").getFile();