You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/23 06:28:46 UTC

[incubator-uniffle] branch master updated: [FOLLOWUP] Store app user in shuffleTaskInfo (#181)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 5864420d [FOLLOWUP] Store app user in shuffleTaskInfo (#181)
5864420d is described below

commit 5864420d7d27c2d50ed03f0698f23556d15ff718
Author: jokercurry <84...@users.noreply.github.com>
AuthorDate: Tue Aug 23 14:28:41 2022 +0800

    [FOLLOWUP] Store app user in shuffleTaskInfo (#181)
    
    ### What changes were proposed in this pull request?
    Store app user information in shuffleTaskInfo.
    
    ### Why are the changes needed?
    Reduce cache.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Passed the ut.
---
 .../java/org/apache/uniffle/server/ShuffleTaskInfo.java     | 13 ++++++++++++-
 .../java/org/apache/uniffle/server/ShuffleTaskManager.java  |  9 +++------
 2 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
index a25dddf5..963a57b8 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskInfo.java
@@ -19,13 +19,14 @@ package org.apache.uniffle.server;
 
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.Maps;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 /**
  * ShuffleTaskInfo contains the information of submitting the shuffle,
- * the information of the cache block, and the timestamp corresponding to the app
+ * the information of the cache block, user and timestamp corresponding to the app
  */
 public class ShuffleTaskInfo {
 
@@ -39,12 +40,14 @@ public class ShuffleTaskInfo {
    * shuffleId -> blockIds
     */
   private Map<Integer, Roaring64NavigableMap> cachedBlockIds;
+  private AtomicReference<String> user;
 
   public ShuffleTaskInfo() {
     this.currentTimes = System.currentTimeMillis();
     this.commitCounts = Maps.newConcurrentMap();
     this.commitLocks = Maps.newConcurrentMap();
     this.cachedBlockIds = Maps.newConcurrentMap();
+    this.user = new AtomicReference<>();
   }
 
   public Long getCurrentTimes() {
@@ -66,4 +69,12 @@ public class ShuffleTaskInfo {
   public Map<Integer, Roaring64NavigableMap> getCachedBlockIds() {
     return cachedBlockIds;
   }
+
+  public String getUser() {
+    return user.get();
+  }
+
+  public void setUser(String user) {
+    this.user.set(user);
+  }
 }
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index ebf2a369..648f83f9 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -78,8 +78,6 @@ public class ShuffleTaskManager {
   private Map<Long, PreAllocatedBufferInfo> requireBufferIds = Maps.newConcurrentMap();
   private Runnable clearResourceThread;
   private BlockingQueue<String> expiredAppIdQueue = Queues.newLinkedBlockingQueue();
-  // appId -> user
-  private Map<String, String> appUserMap = Maps.newConcurrentMap();
   // appId -> shuffleId -> serverReadHandler
 
   public ShuffleTaskManager(
@@ -130,7 +128,7 @@ public class ShuffleTaskManager {
       RemoteStorageInfo remoteStorageInfo,
       String user) {
     refreshAppId(appId);
-    appUserMap.putIfAbsent(appId, user);
+    shuffleTaskInfos.get(appId).setUser(user);
     partitionsToBlockIds.putIfAbsent(appId, Maps.newConcurrentMap());
     for (PartitionRange partitionRange : partitionRanges) {
       shuffleBufferManager.registerBuffer(appId, shuffleId, partitionRange.getStart(), partitionRange.getEnd());
@@ -384,9 +382,8 @@ public class ShuffleTaskManager {
     shuffleBufferManager.removeBuffer(appId);
     shuffleFlushManager.removeResources(appId);
     if (!shuffleToCachedBlockIds.isEmpty()) {
-      storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet(), appUserMap.get(appId));
+      storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet(), getUserByAppId(appId));
     }
-    appUserMap.remove(appId);
     shuffleTaskInfos.remove(appId);
     LOG.info("Finish remove resource for appId[" + appId + "] cost " + (System.currentTimeMillis() - start) + " ms");
   }
@@ -424,7 +421,7 @@ public class ShuffleTaskManager {
   }
 
   public String getUserByAppId(String appId) {
-    return appUserMap.get(appId);
+    return shuffleTaskInfos.computeIfAbsent(appId, x -> new ShuffleTaskInfo()).getUser();
   }
 
   @VisibleForTesting