You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2023/05/13 03:20:53 UTC
[incubator-uniffle] branch master updated: [#493] improvement: replace `putIfAbsent` with `computeIfAbsent` to avoid performance loss in some critical paths (#876)
This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 748b7351 [#493] improvement: replace `putIfAbsent` with `computeIfAbsent` to avoid performance loss in some critical paths (#876)
748b7351 is described below
commit 748b73516cf3de12365eac8ad36bb962b0646e01
Author: Neo Chien <cc...@cs.ccu.edu.tw>
AuthorDate: Sat May 13 11:20:47 2023 +0800
[#493] improvement: replace `putIfAbsent` with `computeIfAbsent` to avoid performance loss in some critical paths (#876)
### What changes were proposed in this pull request?
Replace `putIfAbsent` with `computeIfAbsent` in some critical paths
### Why are the changes needed?
Fix: #493
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
current UT
---
.../hadoop/mapred/SortWriteBufferManager.java | 15 +++++++--------
.../org/apache/uniffle/common/util/RssUtils.java | 2 +-
.../uniffle/coordinator/ApplicationManager.java | 10 ++++++----
.../uniffle/coordinator/SimpleClusterManager.java | 2 +-
.../PartitionBalanceAssignmentStrategy.java | 22 ++++++++++++----------
.../apache/uniffle/server/ShuffleFlushManager.java | 6 ++----
.../apache/uniffle/server/ShuffleTaskManager.java | 8 ++++----
.../server/buffer/ShuffleBufferManager.java | 14 +++++++-------
8 files changed, 40 insertions(+), 39 deletions(-)
diff --git a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
index ad03751b..6e1dc171 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapred/SortWriteBufferManager.java
@@ -160,12 +160,13 @@ public class SortWriteBufferManager<K, V> {
memoryLock.unlock();
}
- if (!buffers.containsKey(partitionId)) {
+ buffers.computeIfAbsent(partitionId, k -> {
SortWriteBuffer<K, V> sortWriterBuffer = new SortWriteBuffer(
- partitionId, comparator, maxSegmentSize, keySerializer, valSerializer);
- buffers.putIfAbsent(partitionId, sortWriterBuffer);
+ partitionId, comparator, maxSegmentSize, keySerializer, valSerializer);
waitSendBuffers.add(sortWriterBuffer);
- }
+ return sortWriterBuffer;
+ });
+
SortWriteBuffer<K, V> buffer = buffers.get(partitionId);
long length = buffer.addRecord(key, value);
if (length > maxMemSize) {
@@ -221,9 +222,7 @@ public class SortWriteBufferManager<K, V> {
buffer.clear();
shuffleBlocks.add(block);
allBlockIds.add(block.getBlockId());
- if (!partitionToBlocks.containsKey(block.getPartitionId())) {
- partitionToBlocks.putIfAbsent(block.getPartitionId(), Lists.newArrayList());
- }
+ partitionToBlocks.computeIfAbsent(block.getPartitionId(), key -> Lists.newArrayList());
partitionToBlocks.get(block.getPartitionId()).add(block.getBlockId());
}
@@ -362,7 +361,7 @@ public class SortWriteBufferManager<K, V> {
// it's run in single thread, and is not thread safe
private int getNextSeqNo(int partitionId) {
- partitionToSeqNo.putIfAbsent(partitionId, 0);
+ partitionToSeqNo.computeIfAbsent(partitionId, key -> 0);
int seqNo = partitionToSeqNo.get(partitionId);
partitionToSeqNo.put(partitionId, seqNo + 1);
return seqNo;
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 3935ab2f..df2889db 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -312,7 +312,7 @@ public class RssUtils {
Roaring64NavigableMap shuffleBitmap, int startPartition, int endPartition) {
Map<Integer, Roaring64NavigableMap> result = Maps.newHashMap();
for (int partitionId = startPartition; partitionId < endPartition; partitionId++) {
- result.putIfAbsent(partitionId, Roaring64NavigableMap.bitmapOf());
+ result.computeIfAbsent(partitionId, key -> Roaring64NavigableMap.bitmapOf());
}
Iterator<Long> it = shuffleBitmap.iterator();
long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index 613ca7e9..4aea5e12 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -140,10 +140,12 @@ public class ApplicationManager implements Closeable {
// add remote path if not exist
for (String path : paths) {
if (!availableRemoteStorageInfo.containsKey(path)) {
- remoteStoragePathRankValue.putIfAbsent(path, new RankValue(0));
- // refreshRemoteStorage is designed without multiple thread problem
- // metrics shouldn't be added duplicated
- addRemoteStorageMetrics(path);
+ remoteStoragePathRankValue.computeIfAbsent(path, key -> {
+ // refreshRemoteStorage is designed without multiple thread problem
+ // metrics shouldn't be added duplicated
+ addRemoteStorageMetrics(path);
+ return new RankValue(0);
+ });
}
String storageHost = getStorageHost(path);
RemoteStorageInfo rsInfo = new RemoteStorageInfo(path, confKVs.getOrDefault(storageHost, Maps.newHashMap()));
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 9359d5d4..7a4f4d7d 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -210,7 +210,7 @@ public class SimpleClusterManager implements ClusterManager {
}
// add node to related tags
for (String tag : tags) {
- tagToNodes.putIfAbsent(tag, Sets.newConcurrentHashSet());
+ tagToNodes.computeIfAbsent(tag, key -> Sets.newConcurrentHashSet());
tagToNodes.get(tag).add(node);
}
}
diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
index 38b95714..6a6458f3 100644
--- a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
+++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/assignment/PartitionBalanceAssignmentStrategy.java
@@ -83,17 +83,19 @@ public class PartitionBalanceAssignmentStrategy extends AbstractAssignmentStrate
List<ServerNode> nodes = clusterManager.getServerList(requiredTags);
Map<ServerNode, PartitionAssignmentInfo> newPartitionInfos = JavaUtils.newConcurrentMap();
for (ServerNode node : nodes) {
- PartitionAssignmentInfo partitionInfo;
- if (serverToPartitions.containsKey(node)) {
- partitionInfo = serverToPartitions.get(node);
- if (partitionInfo.getTimestamp() < node.getTimestamp()) {
- partitionInfo.resetPartitionNum();
- partitionInfo.setTimestamp(node.getTimestamp());
+ newPartitionInfos.computeIfAbsent(node, key -> {
+ PartitionAssignmentInfo partitionInfo;
+ if (serverToPartitions.containsKey(node)) {
+ partitionInfo = serverToPartitions.get(node);
+ if (partitionInfo.getTimestamp() < node.getTimestamp()) {
+ partitionInfo.resetPartitionNum();
+ partitionInfo.setTimestamp(node.getTimestamp());
+ }
+ } else {
+ partitionInfo = new PartitionAssignmentInfo();
}
- } else {
- partitionInfo = new PartitionAssignmentInfo();
- }
- newPartitionInfos.putIfAbsent(node, partitionInfo);
+ return partitionInfo;
+ });
}
serverToPartitions = newPartitionInfos;
int averagePartitions = totalPartitionNum * replica / clusterManager.getShuffleNodesMax();
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 10dbe80f..2d9dfa5d 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -276,11 +276,9 @@ public class ShuffleFlushManager {
if (blocks == null || blocks.size() == 0) {
return;
}
- if (!committedBlockIds.containsKey(appId)) {
- committedBlockIds.putIfAbsent(appId, JavaUtils.newConcurrentMap());
- }
+ committedBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
Map<Integer, Roaring64NavigableMap> shuffleToBlockIds = committedBlockIds.get(appId);
- shuffleToBlockIds.putIfAbsent(shuffleId, Roaring64NavigableMap.bitmapOf());
+ shuffleToBlockIds.computeIfAbsent(shuffleId, key -> Roaring64NavigableMap.bitmapOf());
Roaring64NavigableMap bitmap = shuffleToBlockIds.get(shuffleId);
synchronized (bitmap) {
for (ShufflePartitionedBlock spb : blocks) {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index fb223461..64601eb9 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -205,7 +205,7 @@ public class ShuffleTaskManager {
.build()
);
- partitionsToBlockIds.putIfAbsent(appId, JavaUtils.newConcurrentMap());
+ partitionsToBlockIds.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
for (PartitionRange partitionRange : partitionRanges) {
shuffleBufferManager.registerBuffer(appId, shuffleId, partitionRange.getStart(), partitionRange.getEnd());
}
@@ -302,13 +302,13 @@ public class ShuffleTaskManager {
if (shuffleIdToPartitions == null) {
throw new RssException("appId[" + appId + "] is expired!");
}
- if (!shuffleIdToPartitions.containsKey(shuffleId)) {
+ shuffleIdToPartitions.computeIfAbsent(shuffleId, key -> {
Roaring64NavigableMap[] blockIds = new Roaring64NavigableMap[bitmapNum];
for (int i = 0; i < bitmapNum; i++) {
blockIds[i] = Roaring64NavigableMap.bitmapOf();
}
- shuffleIdToPartitions.putIfAbsent(shuffleId, blockIds);
- }
+ return blockIds;
+ });
Roaring64NavigableMap[] blockIds = shuffleIdToPartitions.get(shuffleId);
for (Map.Entry<Integer, long[]> entry : partitionToBlockIds.entrySet()) {
Integer partitionId = entry.getKey();
diff --git a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 316ae3a3..ca5759fa 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -110,9 +110,9 @@ public class ShuffleBufferManager {
}
public StatusCode registerBuffer(String appId, int shuffleId, int startPartition, int endPartition) {
- bufferPool.putIfAbsent(appId, JavaUtils.newConcurrentMap());
+ bufferPool.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
Map<Integer, RangeMap<Integer, ShuffleBuffer>> shuffleIdToBuffers = bufferPool.get(appId);
- shuffleIdToBuffers.putIfAbsent(shuffleId, TreeRangeMap.create());
+ shuffleIdToBuffers.computeIfAbsent(shuffleId, key -> TreeRangeMap.create());
RangeMap<Integer, ShuffleBuffer> bufferRangeMap = shuffleIdToBuffers.get(shuffleId);
if (bufferRangeMap.get(startPartition) == null) {
ShuffleServerMetrics.counterTotalPartitionNum.inc();
@@ -163,9 +163,9 @@ public class ShuffleBufferManager {
}
private void updateShuffleSize(String appId, int shuffleId, long size) {
- shuffleSizeMap.putIfAbsent(appId, JavaUtils.newConcurrentMap());
+ shuffleSizeMap.computeIfAbsent(appId, key -> JavaUtils.newConcurrentMap());
Map<Integer, AtomicLong> shuffleIdToSize = shuffleSizeMap.get(appId);
- shuffleIdToSize.putIfAbsent(shuffleId, new AtomicLong(0));
+ shuffleIdToSize.computeIfAbsent(shuffleId, key -> new AtomicLong(0));
shuffleIdToSize.get(shuffleId).addAndGet(size);
}
@@ -537,11 +537,11 @@ public class ShuffleBufferManager {
return Lists.newArrayList(sizeMap.entrySet());
}
- private void addPickedShuffle(String key, Map<String, Set<Integer>> pickedShuffle) {
- String[] splits = key.split(Constants.KEY_SPLIT_CHAR);
+ private void addPickedShuffle(String shuffleIdKey, Map<String, Set<Integer>> pickedShuffle) {
+ String[] splits = shuffleIdKey.split(Constants.KEY_SPLIT_CHAR);
String appId = splits[0];
Integer shuffleId = Integer.parseInt(splits[1]);
- pickedShuffle.putIfAbsent(appId, Sets.newHashSet());
+ pickedShuffle.computeIfAbsent(appId, key -> Sets.newHashSet());
Set<Integer> shuffleIdSet = pickedShuffle.get(appId);
shuffleIdSet.add(shuffleId);
}