You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2023/02/03 10:24:14 UTC

[incubator-uniffle] branch master updated: [Improvement] Only report to the shuffle servers that owns the blocks (#539)

This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b35e801 [Improvement] Only report to the shuffle servers that owns the blocks (#539)
2b35e801 is described below

commit 2b35e8016845729b0a34aa8facea926625258ac6
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Fri Feb 3 18:24:07 2023 +0800

    [Improvement] Only report to the shuffle servers that owns the blocks (#539)
    
    ### What changes were proposed in this pull request?
    Only report to the shuffle servers that owns the blocks
    
    ### Why are the changes needed?
    I found some unnecessary log is shown in the shuffle server's log, like this
    ```
    [INFO] 2023-02-02 10:59:04,045 Grpc-881 ShuffleServerGrpcService reportShuffleResult - Report 0 blocks as shuffle result for the task of appId[application_1672821343673_3878453_1675303644304], shuffleId[1619], taskAttemptId[1198007]
    ```
    As shown above, there is not necessary to report 0 blocks to shuffle server for client. Let's remove this logic.
    
    __Improvement__
    1.  for shuffle server, it reduces the log size and make its more clear
    2. for client, it speeds up the client reporting due to avoiding unnecessary rpc request
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    1. UTs
---
 .../client/impl/ShuffleWriteClientImpl.java        | 20 ++++++---
 .../uniffle/test/ShuffleWithRssClientTest.java     | 51 ++++++++++++++++++++++
 2 files changed, 66 insertions(+), 5 deletions(-)

diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 25f4a101..53877088 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -520,18 +520,28 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
     Map<ShuffleServerInfo, List<Integer>> groupedPartitions = Maps.newHashMap();
     Map<Integer, Integer> partitionReportTracker = Maps.newHashMap();
     for (Map.Entry<Integer, List<ShuffleServerInfo>> entry : partitionToServers.entrySet()) {
+      int partitionIdx = entry.getKey();
       for (ShuffleServerInfo ssi : entry.getValue()) {
         if (!groupedPartitions.containsKey(ssi)) {
-          groupedPartitions.putIfAbsent(ssi, Lists.newArrayList());
+          groupedPartitions.put(ssi, Lists.newArrayList());
         }
-        groupedPartitions.get(ssi).add(entry.getKey());
+        groupedPartitions.get(ssi).add(partitionIdx);
+      }
+      if (CollectionUtils.isNotEmpty(partitionToBlockIds.get(partitionIdx))) {
+        partitionReportTracker.putIfAbsent(partitionIdx, 0);
       }
-      partitionReportTracker.putIfAbsent(entry.getKey(), 0);
     }
+
     for (Map.Entry<ShuffleServerInfo, List<Integer>> entry : groupedPartitions.entrySet()) {
       Map<Integer, List<Long>> requestBlockIds = Maps.newHashMap();
       for (Integer partitionId : entry.getValue()) {
-        requestBlockIds.put(partitionId, partitionToBlockIds.get(partitionId));
+        List<Long> blockIds = partitionToBlockIds.get(partitionId);
+        if (CollectionUtils.isNotEmpty(blockIds)) {
+          requestBlockIds.put(partitionId, blockIds);
+        }
+      }
+      if (requestBlockIds.isEmpty()) {
+        continue;
       }
       RssReportShuffleResultRequest request = new RssReportShuffleResultRequest(
           appId, shuffleId, taskAttemptId, requestBlockIds, bitmapNum);
@@ -541,7 +551,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
         if (response.getStatusCode() == ResponseStatusCode.SUCCESS) {
           LOG.info("Report shuffle result to " + ssi + " for appId[" + appId
               + "], shuffleId[" + shuffleId + "] successfully");
-          for (Integer partitionId : entry.getValue()) {
+          for (Integer partitionId : requestBlockIds.keySet()) {
             partitionReportTracker.put(partitionId, partitionReportTracker.get(partitionId) + 1);
           }
         } else {
diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 0a7e1227..440adfbb 100644
--- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -150,6 +150,57 @@ public class ShuffleWithRssClientTest extends ShuffleReadWriteBase {
     assertEquals(blockIdBitmap, report);
   }
 
+  @Test
+  public void reportBlocksToShuffleServerIfNecessary() {
+    String testAppId = "reportBlocksToShuffleServerIfNecessary_appId";
+
+    shuffleWriteClientImpl.registerShuffle(
+        shuffleServerInfo1,
+        testAppId,
+        1,
+        Lists.newArrayList(new PartitionRange(1, 1)),
+        new RemoteStorageInfo(""),
+        ShuffleDataDistributionType.NORMAL
+    );
+
+    shuffleWriteClientImpl.registerShuffle(
+        shuffleServerInfo2,
+        testAppId,
+        1,
+        Lists.newArrayList(new PartitionRange(2, 2)),
+        new RemoteStorageInfo(""),
+        ShuffleDataDistributionType.NORMAL
+    );
+
+    Map<Integer, List<ShuffleServerInfo>> partitionToServers = Maps.newHashMap();
+    partitionToServers.put(1, Lists.newArrayList(shuffleServerInfo1));
+    partitionToServers.put(2, Lists.newArrayList(shuffleServerInfo2));
+    Map<Integer, List<Long>> partitionToBlocks = Maps.newHashMap();
+    List<Long> blockIds = Lists.newArrayList();
+
+    int partitionIdx = 1;
+    for (int i = 0; i < 5; i++) {
+      blockIds.add(ClientUtils.getBlockId(partitionIdx, 0, i));
+    }
+    partitionToBlocks.put(partitionIdx, blockIds);
+
+    // case1
+    shuffleWriteClientImpl
+        .reportShuffleResult(partitionToServers, testAppId, 1, 0, partitionToBlocks, 1);
+    Roaring64NavigableMap bitmap = shuffleWriteClientImpl
+        .getShuffleResult("GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId,
+            1, 0);
+    assertTrue(bitmap.isEmpty());
+
+    bitmap = shuffleWriteClientImpl
+        .getShuffleResult("GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId,
+            1, partitionIdx);
+    assertEquals(5, bitmap.getLongCardinality());
+    for (int i = 0; i < 5; i++) {
+      assertTrue(bitmap.contains(partitionToBlocks.get(1).get(i)));
+    }
+  }
+
   @Test
   public void reportMultipleServerTest() throws Exception {
     String testAppId = "reportMultipleServerTest";