You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/23 06:28:46 UTC
[incubator-uniffle] branch master updated: [FOLLOWUP] Store app user in shuffleTaskInfo (#181)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 5864420d [FOLLOWUP] Store app user in shuffleTaskInfo (#181)
5864420d is described below
commit 5864420d7d27c2d50ed03f0698f23556d15ff718
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Tue Aug 23 14:28:41 2022 +0800
[FOLLOWUP] Store app user in shuffleTaskInfo (#181)
### What changes were proposed in this pull request?
Store app user information in shuffleTaskInfo.
### Why are the changes needed?
Reduce cache.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Passed the ut.
---
.../java/org/apache/uniffle/server/ShuffleTaskInfo.java | 13 ++++++++++++-
.../java/org/apache/uniffle/server/ShuffleTaskManager.java | 9 +++------
2 files changed, 15 insertions(+), 7 deletions(-)
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index a25dddf5..963a57b8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -19,13 +19,14 @@ package org.apache.uniffle.server;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Maps;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
/**
* ShuffleTaskInfo contains the information of submitting the shuffle,
- * the information of the cache block, and the timestamp corresponding to the app
+ * the information of the cache block, user and timestamp corresponding to the app
*/
public class ShuffleTaskInfo {
@@ -39,12 +40,14 @@ public class ShuffleTaskInfo {
* shuffleId -> blockIds
*/
private Map<Integer, Roaring64NavigableMap> cachedBlockIds;
+ private AtomicReference<String> user;
public ShuffleTaskInfo() {
this.currentTimes = System.currentTimeMillis();
this.commitCounts = Maps.newConcurrentMap();
this.commitLocks = Maps.newConcurrentMap();
this.cachedBlockIds = Maps.newConcurrentMap();
+ this.user = new AtomicReference<>();
}
public Long getCurrentTimes() {
@@ -66,4 +69,12 @@ public class ShuffleTaskInfo {
public Map<Integer, Roaring64NavigableMap> getCachedBlockIds() {
return cachedBlockIds;
}
+
+ public String getUser() {
+ return user.get();
+ }
+
+ public void setUser(String user) {
+ this.user.set(user);
+ }
}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index ebf2a369..648f83f9 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -78,8 +78,6 @@ public class ShuffleTaskManager {
private Map<Long, PreAllocatedBufferInfo> requireBufferIds = Maps.newConcurrentMap();
private Runnable clearResourceThread;
private BlockingQueue<String> expiredAppIdQueue = Queues.newLinkedBlockingQueue();
- // appId -> user
- private Map<String, String> appUserMap = Maps.newConcurrentMap();
// appId -> shuffleId -> serverReadHandler
public ShuffleTaskManager(
@@ -130,7 +128,7 @@ public class ShuffleTaskManager {
RemoteStorageInfo remoteStorageInfo,
String user) {
refreshAppId(appId);
- appUserMap.putIfAbsent(appId, user);
+ shuffleTaskInfos.get(appId).setUser(user);
partitionsToBlockIds.putIfAbsent(appId, Maps.newConcurrentMap());
for (PartitionRange partitionRange : partitionRanges) {
shuffleBufferManager.registerBuffer(appId, shuffleId, partitionRange.getStart(), partitionRange.getEnd());
@@ -384,9 +382,8 @@ public class ShuffleTaskManager {
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
if (!shuffleToCachedBlockIds.isEmpty()) {
- storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet(), appUserMap.get(appId));
+ storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet(), getUserByAppId(appId));
}
- appUserMap.remove(appId);
shuffleTaskInfos.remove(appId);
LOG.info("Finish remove resource for appId[" + appId + "] cost " + (System.currentTimeMillis() - start) + " ms");
}
@@ -424,7 +421,7 @@ public class ShuffleTaskManager {
}
public String getUserByAppId(String appId) {
- return appUserMap.get(appId);
+ return shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo()).getUser();
}
@VisibleForTesting