You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2023/03/21 03:38:39 UTC
[incubator-uniffle] branch master updated: [#736] feat(storage): best effort to write same hdfs file when no race condition (#744)
This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 64cf46bd [#736] feat(storage): best effort to write same hdfs file when no race condition (#744)
64cf46bd is described below
commit 64cf46bdb6a2b1d6fd8790bc00359ed69ced4856
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Tue Mar 21 11:38:34 2023 +0800
[#736] feat(storage): best effort to write same hdfs file when no race condition (#744)
### What changes were proposed in this pull request?
best effort to write same hdfs file when no race condition in `PooledHdfsShuffleWriteHandlerTest`
### Why are the changes needed?
1. Reduce the file number for one partition to reduce HDFS pressure.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
1. UTs
---
.../uniffle/server/ShuffleFlushManagerTest.java | 32 ++++++
.../impl/PooledHdfsShuffleWriteHandler.java | 33 ++++--
.../impl/PooledHdfsShuffleWriteHandlerTest.java | 113 +++++++++++++++++++++
3 files changed, 170 insertions(+), 8 deletions(-)
diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 7ccaed9b..2d075b8e 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -118,6 +118,38 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
assertEquals("value", manager.getHadoopConf().get("a.b"));
}
+ /**
+ * When enable concurrent writing single partition data,
+ * it should always write one file if no race condition.
+ */
+ @Test
+ public void concurrentWrite2HdfsWriteOneByOne() throws Exception {
+ ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
+ shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Collections.emptyList());
+ shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
+ int maxConcurrency = 3;
+ shuffleServerConf.setInteger(ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION, maxConcurrency);
+
+ String appId = "concurrentWrite2HdfsWriteOneByOne_appId";
+ StorageManager storageManager =
+ StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+ storageManager.registerRemoteStorage(appId, remoteStorage);
+ ShuffleFlushManager manager =
+ new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", mockShuffleServer, storageManager);
+
+ for (int i = 0; i < 10; i++) {
+ ShuffleDataFlushEvent shuffleDataFlushEvent = createShuffleDataFlushEvent(appId, i, 1, 1, null);
+ manager.addToFlushQueue(shuffleDataFlushEvent);
+ waitForFlush(manager, appId, i, 5);
+ }
+
+ FileStatus[] fileStatuses = fs.listStatus(new Path(HDFS_URI + "/rss/test/" + appId + "/1/1-1"));
+ long actual = Arrays.stream(fileStatuses).filter(x -> x.getPath().getName().endsWith("data")).count();
+ assertEquals(1, actual);
+ actual = Arrays.stream(fileStatuses).filter(x -> x.getPath().getName().endsWith("index")).count();
+ assertEquals(1, actual);
+ }
+
@Test
public void concurrentWrite2HdfsWriteOfSinglePartition() throws Exception {
ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
index 6b2188a7..d225448d 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
@@ -18,9 +18,10 @@
package org.apache.uniffle.storage.handler.impl;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -29,13 +30,29 @@ import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
+/**
+ * The {@link PooledHdfsShuffleWriteHandler} is a wrapper of underlying multiple
+ * {@link HdfsShuffleWriteHandler} to support concurrency control of writing single
+ * partition to multi files.
+ *
+ * By leveraging {@link LinkedBlockingDeque}, it will always write the same file when
+ * no race condition, which is good for reducing file numbers for HDFS.
+ */
public class PooledHdfsShuffleWriteHandler implements ShuffleWriteHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(PooledHdfsShuffleWriteHandler.class);
- private final BlockingQueue<HdfsShuffleWriteHandler> queue;
+ private final LinkedBlockingDeque<ShuffleWriteHandler> queue;
private final int maxConcurrency;
private final String basePath;
+ // Only for tests
+ @VisibleForTesting
+ public PooledHdfsShuffleWriteHandler(LinkedBlockingDeque<ShuffleWriteHandler> queue) {
+ this.queue = queue;
+ this.maxConcurrency = queue.size();
+ this.basePath = StringUtils.EMPTY;
+ }
+
public PooledHdfsShuffleWriteHandler(
String appId,
int shuffleId,
@@ -48,7 +65,7 @@ public class PooledHdfsShuffleWriteHandler implements ShuffleWriteHandler {
int concurrency) {
// todo: support max concurrency specified by client side
this.maxConcurrency = concurrency;
- this.queue = new LinkedBlockingQueue<>(maxConcurrency);
+ this.queue = new LinkedBlockingDeque<>(maxConcurrency);
this.basePath = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath,
ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId, startPartition, endPartition));
@@ -80,13 +97,13 @@ public class PooledHdfsShuffleWriteHandler implements ShuffleWriteHandler {
if (queue.isEmpty()) {
LOGGER.warn("No free hdfs writer handler, it will wait. storage path: {}", basePath);
}
- HdfsShuffleWriteHandler writeHandler = queue.take();
+ ShuffleWriteHandler writeHandler = queue.take();
try {
writeHandler.write(shuffleBlocks);
} finally {
- // Use add() here because we are sure the capacity will not be exceeded.
- // Note: add() throws IllegalStateException when queue is full.
- queue.add(writeHandler);
+ // Use addFirst() here because we are sure the capacity will not be exceeded.
+ // Note: addFirst() throws IllegalStateException when queue is full.
+ queue.addFirst(writeHandler);
}
}
}
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandlerTest.java
new file mode 100644
index 00000000..ca62b835
--- /dev/null
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandlerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class PooledHdfsShuffleWriteHandlerTest {
+
+ static class FakedShuffleWriteHandler implements ShuffleWriteHandler {
+ private List<Integer> invokedList;
+ private int index;
+ private Runnable execution;
+
+ FakedShuffleWriteHandler(List<Integer> invokedList, int index, Runnable runnable) {
+ this.invokedList = invokedList;
+ this.index = index;
+ this.execution = runnable;
+ }
+
+ @Override
+ public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws Exception {
+ execution.run();
+ invokedList.add(index);
+ }
+ }
+
+ @Test
+ public void writeSameFileWhenNoRaceCondition() throws Exception {
+ int concurrency = 5;
+ CopyOnWriteArrayList<Integer> invokedIndexes = new CopyOnWriteArrayList<>();
+ LinkedBlockingDeque deque = new LinkedBlockingDeque(concurrency);
+ for (int i = 0; i < concurrency; i++) {
+ deque.addFirst(
+ new FakedShuffleWriteHandler(invokedIndexes, i, () -> {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException interruptedException) {
+ // ignore
+ }
+ })
+ );
+ }
+ PooledHdfsShuffleWriteHandler handler = new PooledHdfsShuffleWriteHandler(deque);
+
+ for (int i = 0; i < 10; i++) {
+ handler.write(Collections.emptyList());
+ }
+ assertEquals(10, invokedIndexes.size());
+ assertEquals(10, invokedIndexes.stream().filter(x -> x == 4).count());
+ }
+
+ @Test
+ public void concurrentWrite() throws InterruptedException {
+ int concurrency = 5;
+ CopyOnWriteArrayList<Integer> invokedIndexes = new CopyOnWriteArrayList<>();
+ LinkedBlockingDeque deque = new LinkedBlockingDeque(concurrency);
+ for (int i = 0; i < concurrency; i++) {
+ deque.addFirst(
+ new FakedShuffleWriteHandler(invokedIndexes, i, () -> {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException interruptedException) {
+ // ignore
+ }
+ })
+ );
+ }
+ PooledHdfsShuffleWriteHandler handler = new PooledHdfsShuffleWriteHandler(deque);
+
+ ExecutorService executorService = Executors.newFixedThreadPool(concurrency);
+ for (int i = 0; i < concurrency; i++) {
+ executorService.submit(() -> {
+ try {
+ handler.write(Collections.emptyList());
+ } catch (Exception e) {
+ // ignore
+ e.printStackTrace();
+ }
+ });
+ }
+ Awaitility.await().timeout(2, TimeUnit.SECONDS).until(() -> invokedIndexes.size() == concurrency);
+ executorService.shutdownNow();
+ }
+}