You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by GitBox <gi...@apache.org> on 2022/08/16 08:40:58 UTC

[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #158: [Improvement] Introduce a new class ShuffleTaskInfo

jerqi commented on code in PR #158:
URL: https://github.com/apache/incubator-uniffle/pull/158#discussion_r946490458


##########
server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java:
##########
@@ -74,12 +74,7 @@ public class ShuffleTaskManager {
   // but when get blockId, performance will degrade a little which can be optimized by client configuration
   private Map<String, Map<Integer, Roaring64NavigableMap[]>> partitionsToBlockIds;
   private ShuffleBufferManager shuffleBufferManager;
-  private Map<String, Long> appIds = Maps.newConcurrentMap();
-  // appId -> shuffleId -> commit count
-  private Map<String, Map<Integer, AtomicInteger>> commitCounts = Maps.newConcurrentMap();
-  private Map<String, Map<Integer, Object>> commitLocks = Maps.newConcurrentMap();
-  // appId -> shuffleId -> blockIds
-  private Map<String, Map<Integer, Roaring64NavigableMap>> cachedBlockIds = Maps.newConcurrentMap();
+  private Map<String, ShuffleTaskInfo> shuffleTaskInfo = Maps.newConcurrentMap();

Review Comment:
   `shuffleTaskInfo` -> `shuffleTaskInfos`?



##########
server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java:
##########
@@ -161,10 +156,9 @@ public StatusCode commitShuffle(String appId, int shuffleId) throws Exception {
     refreshAppId(appId);
     Roaring64NavigableMap cachedBlockIds = getCachedBlockIds(appId, shuffleId);
     Roaring64NavigableMap cloneBlockIds;
-    commitLocks.putIfAbsent(appId, Maps.newConcurrentMap());
-    Map<Integer, Object> shuffleLevelLocks = commitLocks.get(appId);
-    shuffleLevelLocks.putIfAbsent(shuffleId, new Object());
-    Object lock = shuffleLevelLocks.get(shuffleId);
+    shuffleTaskInfo.computeIfAbsent(appId, x -> new ShuffleTaskInfo())

Review Comment:
   ```
   ShuffleTaskInfo taskInfo = shuffleTaskInfo.computeIfAbsent(appId, x -> new ShuffleTaskInfo());
   Object lock = taskInfo.getCommitLocks().computeIfAbsent(shuffleId, new Object());
   ```
   It seems more clear



##########
server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java:
##########
@@ -227,21 +221,19 @@ public void addFinishedBlockIds(
   }
 
   public int updateAndGetCommitCount(String appId, int shuffleId) {
-    commitCounts.putIfAbsent(appId, Maps.newConcurrentMap());
-    Map<Integer, AtomicInteger> shuffleCommit = commitCounts.get(appId);
-    shuffleCommit.putIfAbsent(shuffleId, new AtomicInteger(0));
-    AtomicInteger commitNum = shuffleCommit.get(shuffleId);
+    shuffleTaskInfo.computeIfAbsent(appId, x -> new ShuffleTaskInfo())

Review Comment:
   ditto.



##########
server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java:
##########
@@ -227,21 +221,19 @@ public void addFinishedBlockIds(
   }
 
   public int updateAndGetCommitCount(String appId, int shuffleId) {
-    commitCounts.putIfAbsent(appId, Maps.newConcurrentMap());
-    Map<Integer, AtomicInteger> shuffleCommit = commitCounts.get(appId);
-    shuffleCommit.putIfAbsent(shuffleId, new AtomicInteger(0));
-    AtomicInteger commitNum = shuffleCommit.get(shuffleId);
+    shuffleTaskInfo.computeIfAbsent(appId, x -> new ShuffleTaskInfo())
+        .getCommitCounts().putIfAbsent(shuffleId, new AtomicInteger(0));
+    AtomicInteger commitNum = shuffleTaskInfo.get(appId).getCommitCounts().get(shuffleId);
     return commitNum.incrementAndGet();
   }
 
   public void updateCachedBlockIds(String appId, int shuffleId, ShufflePartitionedBlock[] spbs) {
     if (spbs == null || spbs.length == 0) {
       return;
     }
-    cachedBlockIds.putIfAbsent(appId, Maps.newConcurrentMap());
-    Map<Integer, Roaring64NavigableMap> shuffleToBlockIds = cachedBlockIds.get(appId);
-    shuffleToBlockIds.putIfAbsent(shuffleId, Roaring64NavigableMap.bitmapOf());
-    Roaring64NavigableMap bitmap = shuffleToBlockIds.get(shuffleId);
+    shuffleTaskInfo.computeIfAbsent(appId, x -> new ShuffleTaskInfo())

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org