You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ck...@apache.org on 2023/02/13 10:01:06 UTC
[incubator-uniffle] branch master updated: [MINOR] refactor: simplify ShuffleWriteClientImpl#genServerToBlocks() (#594)
This is an automated email from the ASF dual-hosted git repository.
ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 7a8cdb04 [MINOR] refactor: simplify ShuffleWriteClientImpl#genServerToBlocks() (#594)
7a8cdb04 is described below
commit 7a8cdb044ddf19a5f659bd3879945fb41cd9329c
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Mon Feb 13 18:00:59 2023 +0800
[MINOR] refactor: simplify ShuffleWriteClientImpl#genServerToBlocks() (#594)
### What changes were proposed in this pull request?
Simplify `ShuffleWriteClientImpl#genServerToBlocks()`.
### Why are the changes needed?
Simplify code logic.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Covered by `ShuffleWriteClientImplTest#testSendDataWithDefectiveServers()`.
---
.../client/impl/ShuffleWriteClientImpl.java | 63 +++++++++-------------
1 file changed, 24 insertions(+), 39 deletions(-)
diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 8b408ad4..27c7cfdd 100644
--- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.client.impl;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -31,6 +32,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
+import java.util.stream.Stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -210,54 +212,37 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
ShuffleBlockInfo sbi,
List<ShuffleServerInfo> serverList,
int replicaNum,
- List<ShuffleServerInfo> excludeServers,
+ Collection<ShuffleServerInfo> excludeServers,
Map<ShuffleServerInfo, Map<Integer, Map<Integer, List<ShuffleBlockInfo>>>> serverToBlocks,
Map<ShuffleServerInfo, List<Long>> serverToBlockIds,
boolean excludeDefectiveServers) {
if (replicaNum <= 0) {
return;
}
- int partitionId = sbi.getPartitionId();
- int shuffleId = sbi.getShuffleId();
- int assignedNum = 0;
- for (ShuffleServerInfo ssi : serverList) {
- if (excludeDefectiveServers && replica > 1 && defectiveServers.contains(ssi)) {
- continue;
- }
- if (CollectionUtils.isNotEmpty(excludeServers) && excludeServers.contains(ssi)) {
- continue;
- }
- if (!serverToBlockIds.containsKey(ssi)) {
- serverToBlockIds.put(ssi, Lists.newArrayList());
- }
- serverToBlockIds.get(ssi).add(sbi.getBlockId());
-
- if (!serverToBlocks.containsKey(ssi)) {
- serverToBlocks.put(ssi, Maps.newHashMap());
- }
- Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks = serverToBlocks.get(ssi);
- if (!shuffleIdToBlocks.containsKey(shuffleId)) {
- shuffleIdToBlocks.put(shuffleId, Maps.newHashMap());
- }
- Map<Integer, List<ShuffleBlockInfo>> partitionToBlocks = shuffleIdToBlocks.get(shuffleId);
- if (!partitionToBlocks.containsKey(partitionId)) {
- partitionToBlocks.put(partitionId, Lists.newArrayList());
- }
- partitionToBlocks.get(partitionId).add(sbi);
- if (excludeServers != null) {
- excludeServers.add(ssi);
- }
- assignedNum++;
- if (assignedNum >= replicaNum) {
- break;
- }
+ Stream<ShuffleServerInfo> servers;
+ if (excludeDefectiveServers && CollectionUtils.isNotEmpty(defectiveServers)) {
+ servers = Stream.concat(serverList.stream().filter(x -> !defectiveServers.contains(x)),
+ serverList.stream().filter(defectiveServers::contains));
+ } else {
+ servers = serverList.stream();
+ }
+ if (excludeServers != null) {
+ servers = servers.filter(x -> !excludeServers.contains(x));
}
- if (assignedNum < replicaNum && excludeDefectiveServers) {
- genServerToBlocks(sbi, serverList, replicaNum - assignedNum,
- excludeServers, serverToBlocks, serverToBlockIds, false);
+ Stream<ShuffleServerInfo> selected = servers.limit(replicaNum);
+ if (excludeServers != null) {
+ selected = selected.peek(excludeServers::add);
}
+ selected.forEach(ssi -> {
+ serverToBlockIds.computeIfAbsent(ssi, id -> Lists.newArrayList())
+ .add(sbi.getBlockId());
+ serverToBlocks.computeIfAbsent(ssi, id -> Maps.newHashMap())
+ .computeIfAbsent(sbi.getShuffleId(), id -> Maps.newHashMap())
+ .computeIfAbsent(sbi.getPartitionId(), id -> Lists.newArrayList())
+ .add(sbi);
+ });
}
/**
@@ -288,7 +273,7 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient {
for (ShuffleBlockInfo sbi : shuffleBlockInfoList) {
List<ShuffleServerInfo> allServers = sbi.getShuffleServerInfos();
if (replicaSkipEnabled) {
- List<ShuffleServerInfo> excludeServers = new ArrayList<>();
+ Set<ShuffleServerInfo> excludeServers = Sets.newHashSet();
genServerToBlocks(sbi, allServers, replicaWrite, excludeServers,
primaryServerToBlocks, primaryServerToBlockIds, true);
genServerToBlocks(sbi, allServers,replica - replicaWrite,