You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2023/05/13 03:20:53 UTC

[incubator-uniffle] branch master updated: [#493] improvement: replace `putIfAbsent` with `computeIfAbsent` to avoid performance loss in some critical paths (#876)

This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 748b7351 [#493] improvement: replace `putIfAbsent` with `computeIfAbsent` to avoid performance loss in some critical paths (#876)
748b7351 is described below

commit 748b73516cf3de12365eac8ad36bb962b0646e01
Author: Neo Chien <cc...@cs.ccu.edu.tw>
AuthorDate: Sat May 13 11:20:47 2023 +0800

    [#493] improvement: replace `putIfAbsent` with `computeIfAbsent` to avoid performance loss in some critical paths (#876)
    
    ### What changes were proposed in this pull request?
    
    Replace `putIfAbsent` with `computeIfAbsent` in some critical paths
    
    ### Why are the changes needed?
    
    Fix: #493
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    current UT
---
 .../hadoop/mapred/SortWriteBufferManager.java      | 15 +++++++--------
 .../org/apache/uniffle/common/util/RssUtils.java   |  2 +-
 .../uniffle/coordinator/ApplicationManager.java    | 10 ++++++----
 .../uniffle/coordinator/SimpleClusterManager.java  |  2 +-
 .../PartitionBalanceAssignmentStrategy.java        | 22 ++++++++++++----------
 .../apache/uniffle/server/ShuffleFlushManager.java |  6 ++----
 .../apache/uniffle/server/ShuffleTaskManager.java  |  8 ++++----
 .../server/buffer/ShuffleBufferManager.java        | 14 +++++++-------
 8 files changed, 40 insertions(+), 39 deletions(-)

diff --git a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index ad03751b..6e1dc171 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -160,12 +160,13 @@ public class SortWriteBufferManager<K, V> {
       memoryLock.unlock();
     }
 
-    if (!buffers.containsKey(partitionId)) {
+    buffers.computeIfAbsent(partitionId, k -> {
       SortWriteBuffer<K, V> sortWriterBuffer = new SortWriteBuffer(
-          partitionId, comparator, maxSegmentSize, keySerializer, valSerializer);
-      buffers.putIfAbsent(partitionId, sortWriterBuffer);
+              partitionId, comparator, maxSegmentSize, keySerializer, valSerializer);
       waitSendBuffers.add(sortWriterBuffer);
-    }
+      return sortWriterBuffer;
+    });
+
     SortWriteBuffer<K, V> buffer = buffers.get(partitionId);
     long length = buffer.addRecord(key, value);
     if (length > maxMemSize) {
@@ -221,9 +222,7 @@ public class SortWriteBufferManager<K, V> {
     buffer.clear();
     shuffleBlocks.add(block);
     allBlockIds.add(block.getBlockId());
-    if (!partitionToBlocks.containsKey(block.getPartitionId())) {
-      partitionToBlocks.putIfAbsent(block.getPartitionId(), Lists.newArrayList());
-    }
+    partitionToBlocks.computeIfAbsent(block.getPartitionId(), key ->  Lists.newArrayList());
     partitionToBlocks.get(block.getPartitionId()).add(block.getBlockId());
   }
 
@@ -362,7 +361,7 @@ public class SortWriteBufferManager<K, V> {
 
   // it's run in single thread, and is not thread safe
   private int getNextSeqNo(int partitionId) {
-    partitionToSeqNo.putIfAbsent(partitionId, 0);
+    partitionToSeqNo.computeIfAbsent(partitionId, key -> 0);
     int seqNo = partitionToSeqNo.get(partitionId);
     partitionToSeqNo.put(partitionId, seqNo + 1);
     return seqNo;
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 3935ab2f..df2889db 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -312,7 +312,7 @@ public class RssUtils {
       Roaring64NavigableMap shuffleBitmap, int startPartition, int endPartition) {
     Map<Integer, Roaring64NavigableMap> result = Maps.newHashMap();
     for (int partitionId = startPartition; partitionId < endPartition; partitionId++) {
-      result.putIfAbsent(partitionId, Roaring64NavigableMap.bitmapOf());
+      result.computeIfAbsent(partitionId, key -> Roaring64NavigableMap.bitmapOf());
     }
     Iterator<Long> it = shuffleBitmap.iterator();
     long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 613ca7e9..4aea5e12 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -140,10 +140,12 @@ public class ApplicationManager implements Closeable {
       // add remote path if not exist
       for (String path : paths) {
         if (!availableRemoteStorageInfo.containsKey(path)) {
-          remoteStoragePathRankValue.putIfAbsent(path, new RankValue(0));
-          // refreshRemoteStorage is designed without multiple thread problem
-          // metrics shouldn't be added duplicated
-          addRemoteStorageMetrics(path);
+          remoteStoragePathRankValue.computeIfAbsent(path, key -> {
+            // refreshRemoteStorage is designed without multiple thread problem
+            // metrics shouldn't be added duplicated
+            addRemoteStorageMetrics(path);
+            return new RankValue(0);
+          });
         }
         String storageHost = getStorageHost(path);
         RemoteStorageInfo rsInfo = new RemoteStorageInfo(path, confKVs.getOrDefault(storageHost, Maps.newHashMap()));
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 9359d5d4..7a4f4d7d 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -210,7 +210,7 @@ public class SimpleClusterManager implements ClusterManager {
     }
     // add node to related tags
     for (String tag : tags) {
-      tagToNodes.putIfAbsent(tag, Sets.newConcurrentHashSet());
+      tagToNodes.computeIfAbsent(tag, key -> Sets.newConcurrentHashSet());
       tagToNodes.get(tag).add(node);
     }
   }
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
index 38b95714..6a6458f3 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
@@ -83,17 +83,19 @@ public class PartitionBalanceAssignmentStrategy extends AbstractAssignmentStrate
       List<ServerNode> nodes = clusterManager.getServerList(requiredTags);
       Map<ServerNode, PartitionAssignmentInfo> newPartitionInfos = JavaUtils.newConcurrentMap();
       for (ServerNode node : nodes) {
-        PartitionAssignmentInfo partitionInfo;
-        if (serverToPartitions.containsKey(node)) {
-          partitionInfo = serverToPartitions.get(node);
-          if (partitionInfo.getTimestamp() < node.getTimestamp()) {
-            partitionInfo.resetPartitionNum();
-            partitionInfo.setTimestamp(node.getTimestamp());
+        newPartitionInfos.computeIfAbsent(node, key -> {
+          PartitionAssignmentInfo partitionInfo;
+          if (serverToPartitions.containsKey(node)) {
+            partitionInfo = serverToPartitions.get(node);
+            if (partitionInfo.getTimestamp() < node.getTimestamp()) {
+              partitionInfo.resetPartitionNum();
+              partitionInfo.setTimestamp(node.getTimestamp());
+            }
+          } else {
+            partitionInfo = new PartitionAssignmentInfo();
           }
-        } else {
-          partitionInfo = new PartitionAssignmentInfo();
-        }
-        newPartitionInfos.putIfAbsent(node, partitionInfo);
+          return partitionInfo;
+        });
       }
       serverToPartitions = newPartitionInfos;
       int averagePartitions = totalPartitionNum * replica / clusterManager.getShuffleNodesMax();
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 10dbe80f..2d9dfa5d 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -276,11 +276,9 @@ public class ShuffleFlushManager {
     if (blocks == null || blocks.size() == 0) {
       return;
     }
-    if (!committedBlockIds.containsKey(appId)) {
-      committedBlockIds.putIfAbsent(appId, JavaUtils.newConcurrentMap());
-    }
+    committedBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
     Map<Integer, Roaring64NavigableMap> shuffleToBlockIds = committedBlockIds.get(appId);
-    shuffleToBlockIds.putIfAbsent(shuffleId, Roaring64NavigableMap.bitmapOf());
+    shuffleToBlockIds.computeIfAbsent(shuffleId, key -> Roaring64NavigableMap.bitmapOf());
     Roaring64NavigableMap bitmap = shuffleToBlockIds.get(shuffleId);
     synchronized (bitmap) {
       for (ShufflePartitionedBlock spb : blocks) {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index fb223461..64601eb9 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -205,7 +205,7 @@ public class ShuffleTaskManager {
             .build()
     );
 
-    partitionsToBlockIds.putIfAbsent(appId, JavaUtils.newConcurrentMap());
+    partitionsToBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
     for (PartitionRange partitionRange : partitionRanges) {
       shuffleBufferManager.registerBuffer(appId, shuffleId, partitionRange.getStart(), partitionRange.getEnd());
     }
@@ -302,13 +302,13 @@ public class ShuffleTaskManager {
     if (shuffleIdToPartitions == null) {
       throw new RssException("appId[" + appId  + "] is expired!");
     }
-    if (!shuffleIdToPartitions.containsKey(shuffleId)) {
+    shuffleIdToPartitions.computeIfAbsent(shuffleId, key -> {
       Roaring64NavigableMap[] blockIds = new Roaring64NavigableMap[bitmapNum];
       for (int i = 0; i < bitmapNum; i++) {
         blockIds[i] = Roaring64NavigableMap.bitmapOf();
       }
-      shuffleIdToPartitions.putIfAbsent(shuffleId, blockIds);
-    }
+      return blockIds;
+    });
     Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
     for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
       Integer partitionId = entry.getKey();
diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 316ae3a3..ca5759fa 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -110,9 +110,9 @@ public class ShuffleBufferManager {
   }
 
   public StatusCode registerBuffer(String appId, int shuffleId, int startPartition, int endPartition) {
-    bufferPool.putIfAbsent(appId, JavaUtils.newConcurrentMap());
+    bufferPool.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
     Map<Integer, RangeMap<Integer, ShuffleBuffer>> shuffleIdToBuffers = bufferPool.get(appId);
-    shuffleIdToBuffers.putIfAbsent(shuffleId, TreeRangeMap.create());
+    shuffleIdToBuffers.computeIfAbsent(shuffleId, key -> TreeRangeMap.create());
     RangeMap<Integer, ShuffleBuffer> bufferRangeMap = shuffleIdToBuffers.get(shuffleId);
     if (bufferRangeMap.get(startPartition) == null) {
       ShuffleServerMetrics.counterTotalPartitionNum.inc();
@@ -163,9 +163,9 @@ public class ShuffleBufferManager {
   }
 
   private void updateShuffleSize(String appId, int shuffleId, long size) {
-    shuffleSizeMap.putIfAbsent(appId, JavaUtils.newConcurrentMap());
+    shuffleSizeMap.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
     Map<Integer, AtomicLong> shuffleIdToSize = shuffleSizeMap.get(appId);
-    shuffleIdToSize.putIfAbsent(shuffleId, new AtomicLong(0));
+    shuffleIdToSize.computeIfAbsent(shuffleId, key -> new AtomicLong(0));
     shuffleIdToSize.get(shuffleId).addAndGet(size);
   }
 
@@ -537,11 +537,11 @@ public class ShuffleBufferManager {
     return Lists.newArrayList(sizeMap.entrySet());
   }
 
-  private void addPickedShuffle(String key, Map<String, Set<Integer>> pickedShuffle) {
-    String[] splits = key.split(Constants.KEY_SPLIT_CHAR);
+  private void addPickedShuffle(String shuffleIdKey, Map<String, Set<Integer>> pickedShuffle) {
+    String[] splits = shuffleIdKey.split(Constants.KEY_SPLIT_CHAR);
     String appId = splits[0];
     Integer shuffleId = Integer.parseInt(splits[1]);
-    pickedShuffle.putIfAbsent(appId, Sets.newHashSet());
+    pickedShuffle.computeIfAbsent(appId, key -> Sets.newHashSet());
     Set<Integer> shuffleIdSet = pickedShuffle.get(appId);
     shuffleIdSet.add(shuffleId);
   }