You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ck...@apache.org on 2023/02/08 07:12:17 UTC
[incubator-uniffle] branch master updated: [Followup #249] refactor: cleanup code and unify interfaces (#558)
This is an automated email from the ASF dual-hosted git repository.
ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 24ea73cf [Followup #249] refactor: cleanup code and unify interfaces (#558)
24ea73cf is described below
commit 24ea73cf8b4d9a9a270bd2ca798b571a2b8b4917
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Wed Feb 8 15:12:11 2023 +0800
[Followup #249] refactor: cleanup code and unify interfaces (#558)
### What changes were proposed in this pull request?
Followup #249. Cleanup code and unify interfaces.
### Why are the changes needed?
Cleanup code and unify interfaces.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI.
---
.../java/org/apache/uniffle/server/ShuffleFlushManager.java | 6 ++++--
.../java/org/apache/uniffle/server/ShuffleTaskManager.java | 12 +++++-------
.../apache/uniffle/server/buffer/ShuffleBufferManager.java | 8 ++------
3 files changed, 11 insertions(+), 15 deletions(-)
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index baaf2955..0d690493 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.server;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -361,8 +362,9 @@ public class ShuffleFlushManager {
}
}
- public void removeResourcesOfShuffleId(String appId, int shuffleId) {
- Optional.ofNullable(committedBlockIds.get(appId)).ifPresent(x -> x.remove(shuffleId));
+ public void removeResourcesOfShuffleId(String appId, Collection<Integer> shuffleIds) {
+ Optional.ofNullable(committedBlockIds.get(appId))
+ .ifPresent(shuffleIdToBlockIds -> shuffleIds.forEach(shuffleIdToBlockIds::remove));
}
private static class PendingShuffleFlushEvent {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 3654b05f..32f56485 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -535,9 +535,9 @@ public class ShuffleTaskManager {
final ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId);
if (taskInfo != null) {
for (Integer shuffleId : shuffleIds) {
- Optional.ofNullable(taskInfo).ifPresent(x -> x.getCachedBlockIds().remove(shuffleId));
- Optional.ofNullable(taskInfo).ifPresent(x -> x.getCommitCounts().remove(shuffleId));
- Optional.ofNullable(taskInfo).ifPresent(x -> x.getCommitLocks().remove(shuffleId));
+ taskInfo.getCachedBlockIds().remove(shuffleId);
+ taskInfo.getCommitCounts().remove(shuffleId);
+ taskInfo.getCommitLocks().remove(shuffleId);
}
}
Optional.ofNullable(partitionsToBlockIds.get(appId)).ifPresent(x -> {
@@ -545,10 +545,8 @@ public class ShuffleTaskManager {
x.remove(shuffleId);
}
});
- shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds.toArray(new Integer[0]));
- for (Integer shuffleId : shuffleIds) {
- shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleId);
- }
+ shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds);
+ shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleIds);
storageManager.removeResources(
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds)
);
diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 8d966d3e..3bce9f87 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -24,7 +24,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
@@ -287,10 +286,7 @@ public class ShuffleBufferManager {
if (shuffleIdToBuffers == null) {
return;
}
- removeBufferByShuffleId(
- appId,
- shuffleIdToBuffers.keySet().stream().collect(Collectors.toList()).toArray(new Integer[0])
- );
+ removeBufferByShuffleId(appId, shuffleIdToBuffers.keySet());
shuffleSizeMap.remove(appId);
bufferPool.remove(appId);
}
@@ -536,7 +532,7 @@ public class ShuffleBufferManager {
shuffleIdSet.add(shuffleId);
}
- public void removeBufferByShuffleId(String appId, Integer... shuffleIds) {
+ public void removeBufferByShuffleId(String appId, Collection<Integer> shuffleIds) {
Map<Integer, RangeMap<Integer, ShuffleBuffer>> shuffleIdToBuffers = bufferPool.get(appId);
if (shuffleIdToBuffers == null) {
return;