You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2023/01/06 15:11:49 UTC
[incubator-uniffle] branch master updated: [ISSUE-378][HugePartition][Part-1] Record every partition data size for one app (#458)
This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ebaff6aa [ISSUE-378][HugePartition][Part-1] Record every partition data size for one app (#458)
ebaff6aa is described below
commit ebaff6aa4eccfe4f54a967bc271da7e345f6c34a
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Fri Jan 6 23:11:43 2023 +0800
[ISSUE-378][HugePartition][Part-1] Record every partition data size for one app (#458)
### What changes were proposed in this pull request?
Record every partition data size for one app
### Why are the changes needed?
This is a subtask for #378
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
1. UTs
---
.../uniffle/server/ShuffleServerGrpcService.java | 2 +-
.../org/apache/uniffle/server/ShuffleTaskInfo.java | 33 +++++++++++++++
.../apache/uniffle/server/ShuffleTaskManager.java | 18 ++++++++
.../apache/uniffle/server/ShuffleTaskInfoTest.java | 48 ++++++++++++++++++++++
.../uniffle/server/ShuffleTaskManagerTest.java | 43 +++++++++++++++++++
5 files changed, 143 insertions(+), 1 deletion(-)
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 97cd41f9..1e6999e3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -249,7 +249,7 @@ public class ShuffleServerGrpcService extends ShuffleServerImplBase {
// after each cacheShuffleData call, the `preAllocatedSize` is updated timely.
manager.releasePreAllocatedSize(toReleasedSize);
alreadyReleasedSize += toReleasedSize;
- manager.updateCachedBlockIds(appId, shuffleId, spd.getBlockList());
+ manager.updateCachedBlockIds(appId, shuffleId, spd.getPartitionId(), spd.getBlockList());
}
} catch (Exception e) {
String errorMsg = "Error happened when shuffleEngine.write for "
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index aeb00887..46becf95 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Maps;
@@ -46,6 +47,12 @@ public class ShuffleTaskInfo {
private AtomicReference<ShuffleDataDistributionType> dataDistType;
+ private AtomicLong totalDataSize = new AtomicLong(0);
+ /**
+ * shuffleId -> partitionId -> partition shuffle data size
+ */
+ private Map<Integer, Map<Integer, Long>> partitionDataSizes;
+
public ShuffleTaskInfo() {
this.currentTimes = System.currentTimeMillis();
this.commitCounts = Maps.newConcurrentMap();
@@ -53,6 +60,7 @@ public class ShuffleTaskInfo {
this.cachedBlockIds = Maps.newConcurrentMap();
this.user = new AtomicReference<>();
this.dataDistType = new AtomicReference<>();
+ this.partitionDataSizes = Maps.newConcurrentMap();
}
public Long getCurrentTimes() {
@@ -91,4 +99,29 @@ public class ShuffleTaskInfo {
public ShuffleDataDistributionType getDataDistType() {
return dataDistType.get();
}
+
+ public void addPartitionDataSize(int shuffleId, int partitionId, long delta) {
+ totalDataSize.addAndGet(delta);
+ partitionDataSizes.computeIfAbsent(shuffleId, key -> Maps.newConcurrentMap());
+ Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
+ partitions.putIfAbsent(partitionId, 0L);
+ partitions.computeIfPresent(partitionId, (k, v) -> v + delta);
+ }
+
+ public long getTotalDataSize() {
+ return totalDataSize.get();
+ }
+
+ public long getPartitionDataSize(int shuffleId, int partitionId) {
+ Map<Integer, Long> partitions = partitionDataSizes.get(shuffleId);
+ if (partitions == null) {
+ return 0;
+ }
+ Long size = partitions.get(partitionId);
+ if (size == null) {
+ return 0L;
+ }
+ return size;
+ }
+
}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index ad93ac53..c753d941 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -297,18 +297,31 @@ public class ShuffleTaskManager {
return commitNum.incrementAndGet();
}
+ // Only for tests
public void updateCachedBlockIds(String appId, int shuffleId, ShufflePartitionedBlock[] spbs) {
+ updateCachedBlockIds(appId, shuffleId, 0, spbs);
+ }
+
+ public void updateCachedBlockIds(String appId, int shuffleId, int partitionId, ShufflePartitionedBlock[] spbs) {
if (spbs == null || spbs.length == 0) {
return;
}
ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo());
Roaring64NavigableMap bitmap = shuffleTaskInfo.getCachedBlockIds()
.computeIfAbsent(shuffleId, x -> Roaring64NavigableMap.bitmapOf());
+
+ long size = 0L;
synchronized (bitmap) {
for (ShufflePartitionedBlock spb : spbs) {
bitmap.addLong(spb.getBlockId());
+ size += spb.getSize();
}
}
+ shuffleTaskInfo.addPartitionDataSize(
+ shuffleId,
+ partitionId,
+ size
+ );
}
public Roaring64NavigableMap getCachedBlockIds(String appId, int shuffleId) {
@@ -603,6 +616,11 @@ public class ShuffleTaskManager {
return shuffleTaskInfos.get(appId).getDataDistType();
}
+ @VisibleForTesting
+ public ShuffleTaskInfo getShuffleTaskInfo(String appId) {
+ return shuffleTaskInfos.get(appId);
+ }
+
private void triggerFlush() {
synchronized (this.shuffleBufferManager) {
this.shuffleBufferManager.flushIfNecessary();
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskInfoTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskInfoTest.java
new file mode 100644
index 00000000..4c407a6e
--- /dev/null
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskInfoTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class ShuffleTaskInfoTest {
+
+ @Test
+ public void partitionSizeSummaryTest() {
+ ShuffleTaskInfo shuffleTaskInfo = new ShuffleTaskInfo();
+ // case1
+ long size = shuffleTaskInfo.getPartitionDataSize(0, 0);
+ assertEquals(0, size);
+
+ // case2
+ shuffleTaskInfo.addPartitionDataSize(0, 0, 1000);
+ size = shuffleTaskInfo.getPartitionDataSize(0, 0);
+ assertEquals(1000, size);
+
+ // case3
+ shuffleTaskInfo.addPartitionDataSize(0, 0, 500);
+ size = shuffleTaskInfo.getPartitionDataSize(0, 0);
+ assertEquals(1500, size);
+
+ assertEquals(
+ 1500,
+ shuffleTaskInfo.getTotalDataSize()
+ );
+ }
+}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 86a8dcdf..16344736 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -75,6 +75,49 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
ShuffleServerMetrics.clear();
}
+ @Test
+ public void partitionDataSizeSummaryTest() throws Exception {
+ String confFile = ClassLoader.getSystemResource("server.conf").getFile();
+ ShuffleServerConf conf = new ShuffleServerConf(confFile);
+ conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.MEMORY_LOCALFILE.name());
+ ShuffleServer shuffleServer = new ShuffleServer(conf);
+ ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager();
+
+ String appId = "partitionDataSizeSummaryTest";
+ int shuffleId = 1;
+
+ shuffleTaskManager.registerShuffle(
+ appId,
+ shuffleId,
+ Lists.newArrayList(new PartitionRange(0, 1)),
+ RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+ StringUtils.EMPTY
+ );
+
+ // case1
+ ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
+ long size1 = partitionedData0.getTotalBlockSize();
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList());
+
+ assertEquals(
+ size1,
+ shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize()
+ );
+
+ // case2
+ partitionedData0 = createPartitionedData(1, 1, 35);
+ long size2 = partitionedData0.getTotalBlockSize();
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, partitionedData0.getBlockList());
+ assertEquals(
+ size1 + size2,
+ shuffleTaskManager.getShuffleTaskInfo(appId).getTotalDataSize()
+ );
+ assertEquals(
+ size1 + size2,
+ shuffleTaskManager.getShuffleTaskInfo(appId).getPartitionDataSize(1, 1)
+ );
+ }
+
@Test
public void registerShuffleTest() throws Exception {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();