You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/22 02:37:18 UTC

[incubator-uniffle] branch master updated: [FOLLOWUP] Delete hdfs shuffle data files using proxy user (#170)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 270e2ada [FOLLOWUP] Delete hdfs shuffle data files using proxy user (#170)
270e2ada is described below

commit 270e2ada1d6603125ff5df472040ce1357425d7d
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Mon Aug 22 10:37:14 2022 +0800

    [FOLLOWUP] Delete hdfs shuffle data files using proxy user (#170)
    
    ### What changes were proposed in this pull request?
    Delete shuffle data files stored on secured hdfs using proxy user
    
    ### Why are the changes needed?
    In previous PR #53, we introduce the proxy user for shuffle server to write shuffle data to HDFS.
    But I ignored that the shuffle server should also delete these files by the way of proxy user.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Manual tests on kerberos HDFS cluster, and unit tests.
---
 server/pom.xml                                     |  10 ++
 .../apache/uniffle/server/ShuffleTaskManager.java  |   4 +-
 .../uniffle/server/storage/HdfsStorageManager.java |   4 +-
 .../server/storage/LocalStorageManager.java        |   4 +-
 .../server/storage/MultiStorageManager.java        |   6 +-
 .../uniffle/server/storage/StorageManager.java     |   2 +-
 .../ShuffleFlushManagerOnKerberizedHdfsTest.java   | 160 +++++++++++++++++++++
 .../uniffle/server/ShuffleFlushManagerTest.java    |  15 +-
 .../storage/handler/api/ShuffleDeleteHandler.java  |   2 +-
 .../handler/impl/HdfsShuffleDeleteHandler.java     |   6 +-
 .../handler/impl/LocalFileDeleteHandler.java       |   2 +-
 11 files changed, 193 insertions(+), 22 deletions(-)

diff --git a/server/pom.xml b/server/pom.xml
index 6318154c..ba005a85 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -107,6 +107,16 @@
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minikdc</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 5fc6ca84..ebf2a369 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -383,10 +383,10 @@ public class ShuffleTaskManager {
     partitionsToBlockIds.remove(appId);
     shuffleBufferManager.removeBuffer(appId);
     shuffleFlushManager.removeResources(appId);
-    appUserMap.remove(appId);
     if (!shuffleToCachedBlockIds.isEmpty()) {
-      storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet());
+      storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet(), appUserMap.get(appId));
     }
+    appUserMap.remove(appId);
     shuffleTaskInfos.remove(appId);
     LOG.info("Finish remove resource for appId[" + appId + "] cost " + (System.currentTimeMillis() - start) + " ms");
   }
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 04d65b13..6fa50d17 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -69,7 +69,7 @@ public class HdfsStorageManager extends SingleStorageManager {
   }
 
   @Override
-  public void removeResources(String appId, Set<Integer> shuffleSet) {
+  public void removeResources(String appId, Set<Integer> shuffleSet, String user) {
     HdfsStorage storage = getStorageByAppId(appId);
     if (storage != null) {
       storage.removeHandlers(appId);
@@ -77,7 +77,7 @@ public class HdfsStorageManager extends SingleStorageManager {
       ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
           .createShuffleDeleteHandler(
               new CreateShuffleDeleteHandlerRequest(StorageType.HDFS.name(), storage.getConf()));
-      deleteHandler.delete(new String[] {storage.getStoragePath()}, appId);
+      deleteHandler.delete(new String[] {storage.getStoragePath()}, appId, user);
     }
   }
 
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index f0ed715d..1bbc6298 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -171,7 +171,7 @@ public class LocalStorageManager extends SingleStorageManager {
   }
 
   @Override
-  public void removeResources(String appId, Set<Integer> shuffleSet) {
+  public void removeResources(String appId, Set<Integer> shuffleSet, String user) {
     for (LocalStorage storage : localStorages) {
       for (Integer shuffleId : shuffleSet) {
         storage.removeHandlers(appId);
@@ -182,7 +182,7 @@ public class LocalStorageManager extends SingleStorageManager {
     ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
         .createShuffleDeleteHandler(
             new CreateShuffleDeleteHandlerRequest(StorageType.LOCALFILE.name(), new Configuration()));
-    deleteHandler.delete(storageBasePaths.toArray(new String[storageBasePaths.size()]), appId);
+    deleteHandler.delete(storageBasePaths.toArray(new String[storageBasePaths.size()]), appId, user);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index 633c2ae9..d4fff0a9 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -112,10 +112,10 @@ public class MultiStorageManager implements StorageManager {
   }
 
   @Override
-  public void removeResources(String appId, Set<Integer> shuffleSet) {
+  public void removeResources(String appId, Set<Integer> shuffleSet, String user) {
     LOG.info("Start to remove resource of appId: {}, shuffles: {}", appId, shuffleSet.toString());
-    warmStorageManager.removeResources(appId, shuffleSet);
-    coldStorageManager.removeResources(appId, shuffleSet);
+    warmStorageManager.removeResources(appId, shuffleSet, user);
+    coldStorageManager.removeResources(appId, shuffleSet, user);
   }
 
   public StorageManager getColdStorageManager() {
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
index 90162ca5..d8fe5c9e 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
@@ -39,7 +39,7 @@ public interface StorageManager {
 
   // todo: add an interface for updateReadMetrics
 
-  void removeResources(String appId, Set<Integer> shuffleSet);
+  void removeResources(String appId, Set<Integer> shuffleSet, String user);
 
   void start();
 
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
new file mode 100644
index 00000000..642d7ab3
--- /dev/null
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.io.FileNotFoundException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.KerberizedHdfsBase;
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.storage.HdfsStorageManager;
+import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.server.storage.StorageManagerFactory;
+import org.apache.uniffle.storage.common.AbstractStorage;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.apache.uniffle.server.ShuffleFlushManagerTest.createShuffleDataFlushEvent;
+import static org.apache.uniffle.server.ShuffleFlushManagerTest.waitForFlush;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ShuffleFlushManagerOnKerberizedHdfsTest extends KerberizedHdfsBase {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ShuffleFlushManagerOnKerberizedHdfsTest.class);
+
+  private ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
+
+  private static RemoteStorageInfo remoteStorage;
+  private static ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
+
+  @BeforeEach
+  public void prepare() throws Exception {
+    ShuffleServerMetrics.register();
+    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Collections.emptyList());
+    shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
+    LogManager.getRootLogger().setLevel(Level.INFO);
+
+    initHadoopSecurityContext();
+  }
+
+  @AfterEach
+  public void afterEach() {
+    ShuffleServerMetrics.clear();
+  }
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    testRunner = ShuffleFlushManagerOnKerberizedHdfsTest.class;
+    KerberizedHdfsBase.init();
+
+    ShuffleTaskManager shuffleTaskManager = mock(ShuffleTaskManager.class);
+    ShuffleBufferManager shuffleBufferManager = mock(ShuffleBufferManager.class);
+
+    when(mockShuffleServer.getShuffleTaskManager()).thenReturn(shuffleTaskManager);
+    when(mockShuffleServer.getShuffleBufferManager()).thenReturn(shuffleBufferManager);
+
+    String storedPath = kerberizedHdfs.getSchemeAndAuthorityPrefix() + "/alex/rss-data/";
+    Map<String, String> confMap = new HashMap<>();
+    for (Map.Entry<String, String> entry : kerberizedHdfs.getConf()) {
+      confMap.put(entry.getKey(), entry.getValue());
+    }
+    remoteStorage = new RemoteStorageInfo(
+        storedPath, confMap
+    );
+  }
+
+  @Test
+  public void clearTest() throws Exception {
+    String appId1 = "complexWriteTest_appId1";
+    String appId2 = "complexWriteTest_appId2";
+
+    when(mockShuffleServer.getShuffleTaskManager().getUserByAppId(appId1)).thenReturn("alex");
+    when(mockShuffleServer.getShuffleTaskManager().getUserByAppId(appId2)).thenReturn("alex");
+
+    StorageManager storageManager =
+        StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+    storageManager.registerRemoteStorage(appId1, remoteStorage);
+    storageManager.registerRemoteStorage(appId2, remoteStorage);
+    ShuffleFlushManager manager =
+        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", mockShuffleServer, storageManager);
+    ShuffleDataFlushEvent event1 =
+        createShuffleDataFlushEvent(appId1, 1, 0, 1, null);
+    manager.addToFlushQueue(event1);
+    ShuffleDataFlushEvent event2 =
+        createShuffleDataFlushEvent(appId2, 1, 0, 1, null);
+    manager.addToFlushQueue(event2);
+    waitForFlush(manager, appId1, 1, 5);
+    waitForFlush(manager, appId2, 1, 5);
+    AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
+    assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
+    assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
+    assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2));
+    int size = storage.getHandlerSize();
+    assertEquals(2, size);
+
+    FileStatus[] fileStatus = kerberizedHdfs.getFileSystem()
+        .listStatus(new Path(remoteStorage.getPath() + "/" + appId1 + "/"));
+    for (FileStatus fileState : fileStatus) {
+      assertEquals("alex", fileState.getOwner());
+    }
+    assertTrue(fileStatus.length > 0);
+    manager.removeResources(appId1);
+
+    assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
+    storageManager.removeResources(appId1, Sets.newHashSet(1), "alex");
+    assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
+    try {
+      kerberizedHdfs.getFileSystem().listStatus(new Path(remoteStorage.getPath() + "/" + appId1 + "/"));
+      fail("Exception should be thrown");
+    } catch (FileNotFoundException fnfe) {
+      // expected exception
+    }
+
+    assertTrue(kerberizedHdfs.getFileSystem().exists(new Path(remoteStorage.getPath())));
+
+    assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
+    assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
+    size = storage.getHandlerSize();
+    assertEquals(1, size);
+    manager.removeResources(appId2);
+    assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
+    storageManager.removeResources(appId2, Sets.newHashSet(1), "alex");
+    assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
+    assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
+    size = storage.getHandlerSize();
+    assertEquals(0, size);
+  }
+}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 67836def..ebdd12ec 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -32,6 +32,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import io.prometheus.client.Gauge;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -225,7 +226,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     manager.removeResources(appId1);
 
     assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
-    storageManager.removeResources(appId1, Sets.newHashSet(1));
+    storageManager.removeResources(appId1, Sets.newHashSet(1), StringUtils.EMPTY);
     assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
     try {
       fs.listStatus(new Path(remoteStorage.getPath() + "/" + appId1 + "/"));
@@ -240,7 +241,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     assertEquals(1, size);
     manager.removeResources(appId2);
     assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
-    storageManager.removeResources(appId2, Sets.newHashSet(1));
+    storageManager.removeResources(appId2, Sets.newHashSet(1), StringUtils.EMPTY);
     assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
     assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
     size = storage.getHandlerSize();
@@ -274,7 +275,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     assertEquals(2, storage.getHandlerSize());
     File file = new File(tempDir, appId1);
     assertTrue(file.exists());
-    storageManager.removeResources(appId1, Sets.newHashSet(1));
+    storageManager.removeResources(appId1, Sets.newHashSet(1), StringUtils.EMPTY);
     manager.removeResources(appId1);
     assertFalse(file.exists());
     ShuffleDataFlushEvent event3 =
@@ -285,7 +286,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
     assertEquals(1, storage.getHandlerSize());
     manager.removeResources(appId2);
-    storageManager.removeResources(appId2, Sets.newHashSet(1));
+    storageManager.removeResources(appId2, Sets.newHashSet(1), StringUtils.EMPTY);
     assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
     assertEquals(0, storage.getHandlerSize());
   }
@@ -321,7 +322,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     } while (size > 0);
   }
 
-  private void waitForFlush(ShuffleFlushManager manager,
+  public static void waitForFlush(ShuffleFlushManager manager,
       String appId, int shuffleId, int expectedBlockNum) throws Exception {
     int retry = 0;
     int size = 0;
@@ -335,14 +336,14 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     } while (size < expectedBlockNum);
   }
 
-  private ShuffleDataFlushEvent createShuffleDataFlushEvent(
+  public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
       String appId, int shuffleId, int startPartition, int endPartition, Supplier<Boolean> isValid) {
     List<ShufflePartitionedBlock> spbs = createBlock(5, 32);
     return new ShuffleDataFlushEvent(ATOMIC_LONG.getAndIncrement(),
         appId, shuffleId, startPartition, endPartition, 1, spbs, isValid, null);
   }
 
-  private List<ShufflePartitionedBlock> createBlock(int num, int length) {
+  public static List<ShufflePartitionedBlock> createBlock(int num, int length) {
     List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
     for (int i = 0; i < num; i++) {
       byte[] buf = new byte[length];
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java
index 1bce4378..c0b32b94 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java
@@ -24,5 +24,5 @@ public interface ShuffleDeleteHandler {
    *
    * @param appId ApplicationId for delete
    */
-  void delete(String[] storageBasePaths, String appId);
+  void delete(String[] storageBasePaths, String appId, String user);
 }
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
index 22f4f0ca..d1201a87 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
@@ -38,16 +38,16 @@ public class HdfsShuffleDeleteHandler implements ShuffleDeleteHandler {
   }
 
   @Override
-  public void delete(String[] storageBasePaths, String appId) {
+  public void delete(String[] storageBasePaths, String appId, String user) {
     Path path = new Path(ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePaths[0], appId));
     boolean isSuccess = false;
     int times = 0;
     int retryMax = 5;
     long start = System.currentTimeMillis();
-    LOG.info("Try delete shuffle data in HDFS for appId[" + appId + "] with " + path);
+    LOG.info("Try delete shuffle data in HDFS for appId[{}] of user[{}] with {}",appId, user, path);
     while (!isSuccess && times < retryMax) {
       try {
-        FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(path, hadoopConf);
+        FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user, path, hadoopConf);
         fileSystem.delete(path, true);
         isSuccess = true;
       } catch (Exception e) {
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
index bde1c28b..97bf3864 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
@@ -31,7 +31,7 @@ public class LocalFileDeleteHandler implements ShuffleDeleteHandler {
   private static final Logger LOG = LoggerFactory.getLogger(LocalFileDeleteHandler.class);
 
   @Override
-  public void delete(String[] storageBasePaths, String appId) {
+  public void delete(String[] storageBasePaths, String appId, String user) {
     for (String basePath : storageBasePaths) {
       String shufflePath = ShuffleStorageUtils.getFullShuffleDataFolder(basePath, appId);
       long start = System.currentTimeMillis();