You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ck...@apache.org on 2023/02/08 07:12:17 UTC

[incubator-uniffle] branch master updated: [Followup #249] refactor: cleanup code and unify interfaces (#558)

This is an automated email from the ASF dual-hosted git repository.

ckj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 24ea73cf [Followup #249] refactor: cleanup code and unify interfaces (#558)
24ea73cf is described below

commit 24ea73cf8b4d9a9a270bd2ca798b571a2b8b4917
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Wed Feb 8 15:12:11 2023 +0800

    [Followup #249] refactor: cleanup code and unify interfaces (#558)
    
    ### What changes were proposed in this pull request?
    
    Followup #249. Cleanup code and unify interfaces.
    
    ### Why are the changes needed?
    
    Cleanup code and unify interfaces.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI.
---
 .../java/org/apache/uniffle/server/ShuffleFlushManager.java  |  6 ++++--
 .../java/org/apache/uniffle/server/ShuffleTaskManager.java   | 12 +++++-------
 .../apache/uniffle/server/buffer/ShuffleBufferManager.java   |  8 ++------
 3 files changed, 11 insertions(+), 15 deletions(-)

diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index baaf2955..0d690493 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.server;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -361,8 +362,9 @@ public class ShuffleFlushManager {
     }
   }
 
-  public void removeResourcesOfShuffleId(String appId, int shuffleId) {
-    Optional.ofNullable(committedBlockIds.get(appId)).ifPresent(x -> x.remove(shuffleId));
+  public void removeResourcesOfShuffleId(String appId, Collection<Integer> shuffleIds) {
+    Optional.ofNullable(committedBlockIds.get(appId))
+        .ifPresent(shuffleIdToBlockIds -> shuffleIds.forEach(shuffleIdToBlockIds::remove));
   }
 
   private static class PendingShuffleFlushEvent {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 3654b05f..32f56485 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -535,9 +535,9 @@ public class ShuffleTaskManager {
     final ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId);
     if (taskInfo != null) {
       for (Integer shuffleId : shuffleIds) {
-        Optional.ofNullable(taskInfo).ifPresent(x -> x.getCachedBlockIds().remove(shuffleId));
-        Optional.ofNullable(taskInfo).ifPresent(x -> x.getCommitCounts().remove(shuffleId));
-        Optional.ofNullable(taskInfo).ifPresent(x -> x.getCommitLocks().remove(shuffleId));
+        taskInfo.getCachedBlockIds().remove(shuffleId);
+        taskInfo.getCommitCounts().remove(shuffleId);
+        taskInfo.getCommitLocks().remove(shuffleId);
       }
     }
     Optional.ofNullable(partitionsToBlockIds.get(appId)).ifPresent(x -> {
@@ -545,10 +545,8 @@ public class ShuffleTaskManager {
         x.remove(shuffleId);
       }
     });
-    shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds.toArray(new Integer[0]));
-    for (Integer shuffleId : shuffleIds) {
-      shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleId);
-    }
+    shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds);
+    shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleIds);
     storageManager.removeResources(
         new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds)
     );
diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 8d966d3e..3bce9f87 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -24,7 +24,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -287,10 +286,7 @@ public class ShuffleBufferManager {
     if (shuffleIdToBuffers == null) {
       return;
     }
-    removeBufferByShuffleId(
-        appId,
-        shuffleIdToBuffers.keySet().stream().collect(Collectors.toList()).toArray(new Integer[0])
-    );
+    removeBufferByShuffleId(appId, shuffleIdToBuffers.keySet());
     shuffleSizeMap.remove(appId);
     bufferPool.remove(appId);
   }
@@ -536,7 +532,7 @@ public class ShuffleBufferManager {
     shuffleIdSet.add(shuffleId);
   }
 
-  public void removeBufferByShuffleId(String appId, Integer... shuffleIds) {
+  public void removeBufferByShuffleId(String appId, Collection<Integer> shuffleIds) {
     Map<Integer, RangeMap<Integer, ShuffleBuffer>> shuffleIdToBuffers = bufferPool.get(appId);
     if (shuffleIdToBuffers == null) {
       return;