You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/08/22 02:37:18 UTC
[incubator-uniffle] branch master updated: [FOLLOWUP] Delete hdfs shuffle data files using proxy user (#170)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 270e2ada [FOLLOWUP] Delete hdfs shuffle data files using proxy user (#170)
270e2ada is described below
commit 270e2ada1d6603125ff5df472040ce1357425d7d
Author: Junfan Zhang <ju...@outlook.com>
AuthorDate: Mon Aug 22 10:37:14 2022 +0800
[FOLLOWUP] Delete hdfs shuffle data files using proxy user (#170)
### What changes were proposed in this pull request?
Delete shuffle data files stored on secured hdfs using proxy user
### Why are the changes needed?
In previous PR #53, we introduce the proxy user for shuffle server to write shuffle data to HDFS.
But I ignored that the shuffle server should also delete these files by the way of proxy user.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manual tests on kerberos HDFS cluster, and unit tests.
---
server/pom.xml | 10 ++
.../apache/uniffle/server/ShuffleTaskManager.java | 4 +-
.../uniffle/server/storage/HdfsStorageManager.java | 4 +-
.../server/storage/LocalStorageManager.java | 4 +-
.../server/storage/MultiStorageManager.java | 6 +-
.../uniffle/server/storage/StorageManager.java | 2 +-
.../ShuffleFlushManagerOnKerberizedHdfsTest.java | 160 +++++++++++++++++++++
.../uniffle/server/ShuffleFlushManagerTest.java | 15 +-
.../storage/handler/api/ShuffleDeleteHandler.java | 2 +-
.../handler/impl/HdfsShuffleDeleteHandler.java | 6 +-
.../handler/impl/LocalFileDeleteHandler.java | 2 +-
11 files changed, 193 insertions(+), 22 deletions(-)
diff --git a/server/pom.xml b/server/pom.xml
index 6318154c..ba005a85 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -107,6 +107,16 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-minikdc</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<build>
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 5fc6ca84..ebf2a369 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -383,10 +383,10 @@ public class ShuffleTaskManager {
partitionsToBlockIds.remove(appId);
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
- appUserMap.remove(appId);
if (!shuffleToCachedBlockIds.isEmpty()) {
- storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet());
+ storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet(), appUserMap.get(appId));
}
+ appUserMap.remove(appId);
shuffleTaskInfos.remove(appId);
LOG.info("Finish remove resource for appId[" + appId + "] cost " + (System.currentTimeMillis() - start) + " ms");
}
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 04d65b13..6fa50d17 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -69,7 +69,7 @@ public class HdfsStorageManager extends SingleStorageManager {
}
@Override
- public void removeResources(String appId, Set<Integer> shuffleSet) {
+ public void removeResources(String appId, Set<Integer> shuffleSet, String user) {
HdfsStorage storage = getStorageByAppId(appId);
if (storage != null) {
storage.removeHandlers(appId);
@@ -77,7 +77,7 @@ public class HdfsStorageManager extends SingleStorageManager {
ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(StorageType.HDFS.name(), storage.getConf()));
- deleteHandler.delete(new String[] {storage.getStoragePath()}, appId);
+ deleteHandler.delete(new String[] {storage.getStoragePath()}, appId, user);
}
}
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index f0ed715d..1bbc6298 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -171,7 +171,7 @@ public class LocalStorageManager extends SingleStorageManager {
}
@Override
- public void removeResources(String appId, Set<Integer> shuffleSet) {
+ public void removeResources(String appId, Set<Integer> shuffleSet, String user) {
for (LocalStorage storage : localStorages) {
for (Integer shuffleId : shuffleSet) {
storage.removeHandlers(appId);
@@ -182,7 +182,7 @@ public class LocalStorageManager extends SingleStorageManager {
ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(StorageType.LOCALFILE.name(), new Configuration()));
- deleteHandler.delete(storageBasePaths.toArray(new String[storageBasePaths.size()]), appId);
+ deleteHandler.delete(storageBasePaths.toArray(new String[storageBasePaths.size()]), appId, user);
}
@Override
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index 633c2ae9..d4fff0a9 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -112,10 +112,10 @@ public class MultiStorageManager implements StorageManager {
}
@Override
- public void removeResources(String appId, Set<Integer> shuffleSet) {
+ public void removeResources(String appId, Set<Integer> shuffleSet, String user) {
LOG.info("Start to remove resource of appId: {}, shuffles: {}", appId, shuffleSet.toString());
- warmStorageManager.removeResources(appId, shuffleSet);
- coldStorageManager.removeResources(appId, shuffleSet);
+ warmStorageManager.removeResources(appId, shuffleSet, user);
+ coldStorageManager.removeResources(appId, shuffleSet, user);
}
public StorageManager getColdStorageManager() {
diff --git a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
index 90162ca5..d8fe5c9e 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
@@ -39,7 +39,7 @@ public interface StorageManager {
// todo: add an interface for updateReadMetrics
- void removeResources(String appId, Set<Integer> shuffleSet);
+ void removeResources(String appId, Set<Integer> shuffleSet, String user);
void start();
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
new file mode 100644
index 00000000..642d7ab3
--- /dev/null
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.io.FileNotFoundException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.Sets;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.KerberizedHdfsBase;
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.storage.HdfsStorageManager;
+import org.apache.uniffle.server.storage.StorageManager;
+import org.apache.uniffle.server.storage.StorageManagerFactory;
+import org.apache.uniffle.storage.common.AbstractStorage;
+import org.apache.uniffle.storage.util.StorageType;
+
+import static org.apache.uniffle.server.ShuffleFlushManagerTest.createShuffleDataFlushEvent;
+import static org.apache.uniffle.server.ShuffleFlushManagerTest.waitForFlush;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ShuffleFlushManagerOnKerberizedHdfsTest extends KerberizedHdfsBase {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ShuffleFlushManagerOnKerberizedHdfsTest.class);
+
+ private ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
+
+ private static RemoteStorageInfo remoteStorage;
+ private static ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
+
+ @BeforeEach
+ public void prepare() throws Exception {
+ ShuffleServerMetrics.register();
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Collections.emptyList());
+ shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
+ LogManager.getRootLogger().setLevel(Level.INFO);
+
+ initHadoopSecurityContext();
+ }
+
+ @AfterEach
+ public void afterEach() {
+ ShuffleServerMetrics.clear();
+ }
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ testRunner = ShuffleFlushManagerOnKerberizedHdfsTest.class;
+ KerberizedHdfsBase.init();
+
+ ShuffleTaskManager shuffleTaskManager = mock(ShuffleTaskManager.class);
+ ShuffleBufferManager shuffleBufferManager = mock(ShuffleBufferManager.class);
+
+ when(mockShuffleServer.getShuffleTaskManager()).thenReturn(shuffleTaskManager);
+ when(mockShuffleServer.getShuffleBufferManager()).thenReturn(shuffleBufferManager);
+
+ String storedPath = kerberizedHdfs.getSchemeAndAuthorityPrefix() + "/alex/rss-data/";
+ Map<String, String> confMap = new HashMap<>();
+ for (Map.Entry<String, String> entry : kerberizedHdfs.getConf()) {
+ confMap.put(entry.getKey(), entry.getValue());
+ }
+ remoteStorage = new RemoteStorageInfo(
+ storedPath, confMap
+ );
+ }
+
+ @Test
+ public void clearTest() throws Exception {
+ String appId1 = "complexWriteTest_appId1";
+ String appId2 = "complexWriteTest_appId2";
+
+ when(mockShuffleServer.getShuffleTaskManager().getUserByAppId(appId1)).thenReturn("alex");
+ when(mockShuffleServer.getShuffleTaskManager().getUserByAppId(appId2)).thenReturn("alex");
+
+ StorageManager storageManager =
+ StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+ storageManager.registerRemoteStorage(appId1, remoteStorage);
+ storageManager.registerRemoteStorage(appId2, remoteStorage);
+ ShuffleFlushManager manager =
+ new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", mockShuffleServer, storageManager);
+ ShuffleDataFlushEvent event1 =
+ createShuffleDataFlushEvent(appId1, 1, 0, 1, null);
+ manager.addToFlushQueue(event1);
+ ShuffleDataFlushEvent event2 =
+ createShuffleDataFlushEvent(appId2, 1, 0, 1, null);
+ manager.addToFlushQueue(event2);
+ waitForFlush(manager, appId1, 1, 5);
+ waitForFlush(manager, appId2, 1, 5);
+ AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
+ assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
+ assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
+ assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2));
+ int size = storage.getHandlerSize();
+ assertEquals(2, size);
+
+ FileStatus[] fileStatus = kerberizedHdfs.getFileSystem()
+ .listStatus(new Path(remoteStorage.getPath() + "/" + appId1 + "/"));
+ for (FileStatus fileState : fileStatus) {
+ assertEquals("alex", fileState.getOwner());
+ }
+ assertTrue(fileStatus.length > 0);
+ manager.removeResources(appId1);
+
+ assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
+ storageManager.removeResources(appId1, Sets.newHashSet(1), "alex");
+ assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
+ try {
+ kerberizedHdfs.getFileSystem().listStatus(new Path(remoteStorage.getPath() + "/" + appId1 + "/"));
+ fail("Exception should be thrown");
+ } catch (FileNotFoundException fnfe) {
+ // expected exception
+ }
+
+ assertTrue(kerberizedHdfs.getFileSystem().exists(new Path(remoteStorage.getPath())));
+
+ assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
+ assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
+ size = storage.getHandlerSize();
+ assertEquals(1, size);
+ manager.removeResources(appId2);
+ assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
+ storageManager.removeResources(appId2, Sets.newHashSet(1), "alex");
+ assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
+ assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
+ size = storage.getHandlerSize();
+ assertEquals(0, size);
+ }
+}
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 67836def..ebdd12ec 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -32,6 +32,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.prometheus.client.Gauge;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -225,7 +226,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
manager.removeResources(appId1);
assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
- storageManager.removeResources(appId1, Sets.newHashSet(1));
+ storageManager.removeResources(appId1, Sets.newHashSet(1), StringUtils.EMPTY);
assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
try {
fs.listStatus(new Path(remoteStorage.getPath() + "/" + appId1 + "/"));
@@ -240,7 +241,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
assertEquals(1, size);
manager.removeResources(appId2);
assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
- storageManager.removeResources(appId2, Sets.newHashSet(1));
+ storageManager.removeResources(appId2, Sets.newHashSet(1), StringUtils.EMPTY);
assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
size = storage.getHandlerSize();
@@ -274,7 +275,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
assertEquals(2, storage.getHandlerSize());
File file = new File(tempDir, appId1);
assertTrue(file.exists());
- storageManager.removeResources(appId1, Sets.newHashSet(1));
+ storageManager.removeResources(appId1, Sets.newHashSet(1), StringUtils.EMPTY);
manager.removeResources(appId1);
assertFalse(file.exists());
ShuffleDataFlushEvent event3 =
@@ -285,7 +286,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(1, storage.getHandlerSize());
manager.removeResources(appId2);
- storageManager.removeResources(appId2, Sets.newHashSet(1));
+ storageManager.removeResources(appId2, Sets.newHashSet(1), StringUtils.EMPTY);
assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(0, storage.getHandlerSize());
}
@@ -321,7 +322,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
} while (size > 0);
}
- private void waitForFlush(ShuffleFlushManager manager,
+ public static void waitForFlush(ShuffleFlushManager manager,
String appId, int shuffleId, int expectedBlockNum) throws Exception {
int retry = 0;
int size = 0;
@@ -335,14 +336,14 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
} while (size < expectedBlockNum);
}
- private ShuffleDataFlushEvent createShuffleDataFlushEvent(
+ public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
String appId, int shuffleId, int startPartition, int endPartition, Supplier<Boolean> isValid) {
List<ShufflePartitionedBlock> spbs = createBlock(5, 32);
return new ShuffleDataFlushEvent(ATOMIC_LONG.getAndIncrement(),
appId, shuffleId, startPartition, endPartition, 1, spbs, isValid, null);
}
- private List<ShufflePartitionedBlock> createBlock(int num, int length) {
+ public static List<ShufflePartitionedBlock> createBlock(int num, int length) {
List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java
index 1bce4378..c0b32b94 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java
@@ -24,5 +24,5 @@ public interface ShuffleDeleteHandler {
*
* @param appId ApplicationId for delete
*/
- void delete(String[] storageBasePaths, String appId);
+ void delete(String[] storageBasePaths, String appId, String user);
}
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
index 22f4f0ca..d1201a87 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsShuffleDeleteHandler.java
@@ -38,16 +38,16 @@ public class HdfsShuffleDeleteHandler implements ShuffleDeleteHandler {
}
@Override
- public void delete(String[] storageBasePaths, String appId) {
+ public void delete(String[] storageBasePaths, String appId, String user) {
Path path = new Path(ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePaths[0], appId));
boolean isSuccess = false;
int times = 0;
int retryMax = 5;
long start = System.currentTimeMillis();
- LOG.info("Try delete shuffle data in HDFS for appId[" + appId + "] with " + path);
+ LOG.info("Try delete shuffle data in HDFS for appId[{}] of user[{}] with {}",appId, user, path);
while (!isSuccess && times < retryMax) {
try {
- FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(path, hadoopConf);
+ FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user, path, hadoopConf);
fileSystem.delete(path, true);
isSuccess = true;
} catch (Exception e) {
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
index bde1c28b..97bf3864 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
@@ -31,7 +31,7 @@ public class LocalFileDeleteHandler implements ShuffleDeleteHandler {
private static final Logger LOG = LoggerFactory.getLogger(LocalFileDeleteHandler.class);
@Override
- public void delete(String[] storageBasePaths, String appId) {
+ public void delete(String[] storageBasePaths, String appId, String user) {
for (String basePath : storageBasePaths) {
String shufflePath = ShuffleStorageUtils.getFullShuffleDataFolder(basePath, appId);
long start = System.currentTimeMillis();