You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2023/02/03 10:24:14 UTC
[incubator-uniffle] branch master updated: [Improvement] Only report to the shuffle servers that owns the blocks (#539)
This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 2b35e801 [Improvement] Only report to the shuffle servers that owns the blocks (#539)
2b35e801 is described below
commit 2b35e8016845729b0a34aa8facea926625258ac6
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Fri Feb 3 18:24:07 2023 +0800
[Improvement] Only report to the shuffle servers that owns the blocks (#539)
### What changes were proposed in this pull request?
Only report to the shuffle servers that owns the blocks
### Why are the changes needed?
I found some unnecessary log is shown in the shuffle server's log, like this
```
[INFO] 2023-02-02 10:59:04,045 Grpc-881 ShuffleServerGrpcService reportShuffleResult - Report 0 blocks as shuffle result for the task of appId[application_1672821343673_3878453_1675303644304], shuffleId[1619], taskAttemptId[1198007]
```
As shown above, there is not necessary to report 0 blocks to shuffle server for client. Let's remove this logic.
__Improvement__
1. for shuffle server, it reduces the log size and make its more clear
2. for client, it speeds up the client reporting due to avoiding unnecessary rpc request
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
1. UTs
---
.../client/impl/ShuffleWriteClientImpl.java | 20 ++++++---
.../uniffle/test/ShuffleWithRssClientTest.java | 51 ++++++++++++++++++++++
2 files changed, 66 insertions(+), 5 deletions(-)
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 25f4a101..53877088 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -520,18 +520,28 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
Map<ShuffleServerInfo, List<Integer>> groupedPartitions = Maps.newHashMap();
Map<Integer, Integer> partitionReportTracker = Maps.newHashMap();
for (Map.Entry<Integer, List<ShuffleServerInfo>> entry : partitionToServers.entrySet()) {
+ int partitionIdx = entry.getKey();
for (ShuffleServerInfo ssi : entry.getValue()) {
if (!groupedPartitions.containsKey(ssi)) {
- groupedPartitions.putIfAbsent(ssi, Lists.newArrayList());
+ groupedPartitions.put(ssi, Lists.newArrayList());
}
- groupedPartitions.get(ssi).add(entry.getKey());
+ groupedPartitions.get(ssi).add(partitionIdx);
+ }
+ if (CollectionUtils.isNotEmpty(partitionToBlockIds.get(partitionIdx))) {
+ partitionReportTracker.putIfAbsent(partitionIdx, 0);
}
- partitionReportTracker.putIfAbsent(entry.getKey(), 0);
}
+
for (Map.Entry<ShuffleServerInfo, List<Integer>> entry : groupedPartitions.entrySet()) {
Map<Integer, List<Long>> requestBlockIds = Maps.newHashMap();
for (Integer partitionId : entry.getValue()) {
- requestBlockIds.put(partitionId, partitionToBlockIds.get(partitionId));
+ List<Long> blockIds = partitionToBlockIds.get(partitionId);
+ if (CollectionUtils.isNotEmpty(blockIds)) {
+ requestBlockIds.put(partitionId, blockIds);
+ }
+ }
+ if (requestBlockIds.isEmpty()) {
+ continue;
}
RssReportShuffleResultRequest request = new RssReportShuffleResultRequest(
appId, shuffleId, taskAttemptId, requestBlockIds, bitmapNum);
@@ -541,7 +551,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
LOG.info("Report shuffle result to " + ssi + " for appId[" + appId
+ "], shuffleId[" + shuffleId + "] successfully");
- for (Integer partitionId : entry.getValue()) {
+ for (Integer partitionId : requestBlockIds.keySet()) {
partitionReportTracker.put(partitionId, partitionReportTracker.get(partitionId) + 1);
}
} else {
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 0a7e1227..440adfbb 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -150,6 +150,57 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
assertEquals(blockIdBitmap, report);
}
+ @Test
+ public void reportBlocksToShuffleServerIfNecessary() {
+ String testAppId = "reportBlocksToShuffleServerIfNecessary_appId";
+
+ shuffleWriteClientImpl.registerShuffle(
+ shuffleServerInfo1,
+ testAppId,
+ 1,
+ Lists.newArrayList(new PartitionRange(1, 1)),
+ new RemoteStorageInfo(""),
+ ShuffleDataDistributionType.NORMAL
+ );
+
+ shuffleWriteClientImpl.registerShuffle(
+ shuffleServerInfo2,
+ testAppId,
+ 1,
+ Lists.newArrayList(new PartitionRange(2, 2)),
+ new RemoteStorageInfo(""),
+ ShuffleDataDistributionType.NORMAL
+ );
+
+ Map<Integer, List<ShuffleServerInfo>> partitionToServers = Maps.newHashMap();
+ partitionToServers.put(1, Lists.newArrayList(shuffleServerInfo1));
+ partitionToServers.put(2, Lists.newArrayList(shuffleServerInfo2));
+ Map<Integer, List<Long>> partitionToBlocks = Maps.newHashMap();
+ List<Long> blockIds = Lists.newArrayList();
+
+ int partitionIdx = 1;
+ for (int i = 0; i < 5; i++) {
+ blockIds.add(ClientUtils.getBlockId(partitionIdx, 0, i));
+ }
+ partitionToBlocks.put(partitionIdx, blockIds);
+
+ // case1
+ shuffleWriteClientImpl
+ .reportShuffleResult(partitionToServers, testAppId, 1, 0, partitionToBlocks, 1);
+ Roaring64NavigableMap bitmap = shuffleWriteClientImpl
+ .getShuffleResult("GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId,
+ 1, 0);
+ assertTrue(bitmap.isEmpty());
+
+ bitmap = shuffleWriteClientImpl
+ .getShuffleResult("GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId,
+ 1, partitionIdx);
+ assertEquals(5, bitmap.getLongCardinality());
+ for (int i = 0; i < 5; i++) {
+ assertTrue(bitmap.contains(partitionToBlocks.get(1).get(i)));
+ }
+ }
+
@Test
public void reportMultipleServerTest() throws Exception {
String testAppId = "reportMultipleServerTest";