You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ck...@apache.org on 2023/02/13 10:01:06 UTC

[incubator-uniffle] branch master updated: [MINOR] refactor: simplify ShuffleWriteClientImpl#genServerToBlocks() (#594)

This is an automated email from the ASF dual-hosted git repository.

ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a8cdb04 [MINOR] refactor: simplify ShuffleWriteClientImpl#genServerToBlocks() (#594)
7a8cdb04 is described below

commit 7a8cdb044ddf19a5f659bd3879945fb41cd9329c
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Mon Feb 13 18:00:59 2023 +0800

    [MINOR] refactor: simplify ShuffleWriteClientImpl#genServerToBlocks() (#594)
    
    ### What changes were proposed in this pull request?
    
    Simplify `ShuffleWriteClientImpl#genServerToBlocks()`.
    
    ### Why are the changes needed?
    
    Simplify code logic.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Covered by `ShuffleWriteClientImplTest#testSendDataWithDefectiveServers()`.
---
 .../client/impl/ShuffleWriteClientImpl.java        | 63 +++++++++-------------
 1 file changed, 24 insertions(+), 39 deletions(-)

diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 8b408ad4..27c7cfdd 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -18,6 +18,7 @@
 package org.apache.uniffle.client.impl;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +32,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
+import java.util.stream.Stream;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -210,54 +212,37 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
       ShuffleBlockInfo sbi,
       List<ShuffleServerInfo> serverList,
       int replicaNum,
-      List<ShuffleServerInfo> excludeServers,
+      Collection<ShuffleServerInfo> excludeServers,
       Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> serverToBlocks,
       Map<ShuffleServerInfo, List<Long>> serverToBlockIds,
       boolean excludeDefectiveServers) {
     if (replicaNum <= 0) {
       return;
     }
-    int partitionId = sbi.getPartitionId();
-    int shuffleId = sbi.getShuffleId();
-    int assignedNum = 0;
-    for (ShuffleServerInfo ssi : serverList) {
-      if (excludeDefectiveServers && replica > 1 && defectiveServers.contains(ssi)) {
-        continue;
-      }
-      if (CollectionUtils.isNotEmpty(excludeServers) && excludeServers.contains(ssi)) {
-        continue;
-      }
-      if (!serverToBlockIds.containsKey(ssi)) {
-        serverToBlockIds.put(ssi, Lists.newArrayList());
-      }
-      serverToBlockIds.get(ssi).add(sbi.getBlockId());
-
-      if (!serverToBlocks.containsKey(ssi)) {
-        serverToBlocks.put(ssi, Maps.newHashMap());
-      }
-      Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks = serverToBlocks.get(ssi);
-      if (!shuffleIdToBlocks.containsKey(shuffleId)) {
-        shuffleIdToBlocks.put(shuffleId, Maps.newHashMap());
-      }
 
-      Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = shuffleIdToBlocks.get(shuffleId);
-      if (!partitionToBlocks.containsKey(partitionId)) {
-        partitionToBlocks.put(partitionId, Lists.newArrayList());
-      }
-      partitionToBlocks.get(partitionId).add(sbi);
-      if (excludeServers != null) {
-        excludeServers.add(ssi);
-      }
-      assignedNum++;
-      if (assignedNum >= replicaNum) {
-        break;
-      }
+    Stream<ShuffleServerInfo> servers;
+    if (excludeDefectiveServers && CollectionUtils.isNotEmpty(defectiveServers)) {
+      servers = Stream.concat(serverList.stream().filter(x -> !defectiveServers.contains(x)),
+          serverList.stream().filter(defectiveServers::contains));
+    } else {
+      servers = serverList.stream();
+    }
+    if (excludeServers != null) {
+      servers = servers.filter(x -> !excludeServers.contains(x));
     }
 
-    if (assignedNum < replicaNum && excludeDefectiveServers) {
-      genServerToBlocks(sbi, serverList, replicaNum - assignedNum,
-          excludeServers, serverToBlocks, serverToBlockIds, false);
+    Stream<ShuffleServerInfo> selected = servers.limit(replicaNum);
+    if (excludeServers != null) {
+      selected = selected.peek(excludeServers::add);
     }
+    selected.forEach(ssi -> {
+      serverToBlockIds.computeIfAbsent(ssi, id -> Lists.newArrayList())
+          .add(sbi.getBlockId());
+      serverToBlocks.computeIfAbsent(ssi, id -> Maps.newHashMap())
+          .computeIfAbsent(sbi.getShuffleId(), id -> Maps.newHashMap())
+          .computeIfAbsent(sbi.getPartitionId(), id -> Lists.newArrayList())
+          .add(sbi);
+    });
   }
 
   /**
@@ -288,7 +273,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
     for (ShuffleBlockInfo sbi : shuffleBlockInfoList) {
       List<ShuffleServerInfo> allServers = sbi.getShuffleServerInfos();
       if (replicaSkipEnabled) {
-        List<ShuffleServerInfo> excludeServers = new ArrayList<>();
+        Set<ShuffleServerInfo> excludeServers = Sets.newHashSet();
         genServerToBlocks(sbi, allServers, replicaWrite, excludeServers,
             primaryServerToBlocks, primaryServerToBlockIds, true);
         genServerToBlocks(sbi, allServers,replica - replicaWrite,