You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by zu...@apache.org on 2023/03/21 03:38:39 UTC

[incubator-uniffle] branch master updated: [#736] feat(storage): best effort to write same hdfs file when no race condition (#744)

This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 64cf46bd [#736] feat(storage): best effort to write same hdfs file when no race condition (#744)
64cf46bd is described below

commit 64cf46bdb6a2b1d6fd8790bc00359ed69ced4856
Author: Junfan Zhang <zu...@apache.org>
AuthorDate: Tue Mar 21 11:38:34 2023 +0800

    [#736] feat(storage): best effort to write same hdfs file when no race condition (#744)
    
    ### What changes were proposed in this pull request?
    
    best effort to write same hdfs file when no race condition in `PooledHdfsShuffleWriteHandlerTest`
    
    ### Why are the changes needed?
    
    1. Reduce the file number for one partition to reduce HDFS pressure.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    1. UTs
---
 .../uniffle/server/ShuffleFlushManagerTest.java    |  32 ++++++
 .../impl/PooledHdfsShuffleWriteHandler.java        |  33 ++++--
 .../impl/PooledHdfsShuffleWriteHandlerTest.java    | 113 +++++++++++++++++++++
 3 files changed, 170 insertions(+), 8 deletions(-)

diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 7ccaed9b..2d075b8e 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -118,6 +118,38 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     assertEquals("value", manager.getHadoopConf().get("a.b"));
   }
 
+  /**
+   * When enable concurrent writing single partition data,
+   * it should always write one file if no race condition.
+   */
+  @Test
+  public void concurrentWrite2HdfsWriteOneByOne() throws Exception {
+    ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
+    shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Collections.emptyList());
+    shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
+    int maxConcurrency = 3;
+    shuffleServerConf.setInteger(ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION, maxConcurrency);
+
+    String appId = "concurrentWrite2HdfsWriteOneByOne_appId";
+    StorageManager storageManager =
+        StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
+    storageManager.registerRemoteStorage(appId, remoteStorage);
+    ShuffleFlushManager manager =
+        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", mockShuffleServer, storageManager);
+
+    for (int i = 0; i < 10; i++) {
+      ShuffleDataFlushEvent shuffleDataFlushEvent = createShuffleDataFlushEvent(appId, i, 1, 1, null);
+      manager.addToFlushQueue(shuffleDataFlushEvent);
+      waitForFlush(manager, appId, i, 5);
+    }
+
+    FileStatus[] fileStatuses = fs.listStatus(new Path(HDFS_URI + "/rss/test/" + appId + "/1/1-1"));
+    long actual = Arrays.stream(fileStatuses).filter(x -> x.getPath().getName().endsWith("data")).count();
+    assertEquals(1, actual);
+    actual = Arrays.stream(fileStatuses).filter(x -> x.getPath().getName().endsWith("index")).count();
+    assertEquals(1, actual);
+  }
+
   @Test
   public void concurrentWrite2HdfsWriteOfSinglePartition() throws Exception {
     ShuffleServerConf shuffleServerConf = new ShuffleServerConf();
diff --git a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
index 6b2188a7..d225448d 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandler.java
@@ -18,9 +18,10 @@
 package org.apache.uniffle.storage.handler.impl;
 
 import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -29,13 +30,29 @@ import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
+/**
+ * The {@link PooledHdfsShuffleWriteHandler} is a wrapper of underlying multiple
+ * {@link HdfsShuffleWriteHandler} to support concurrency control of writing single
+ * partition to multi files.
+ *
+ * By leveraging {@link LinkedBlockingDeque}, it will always write the same file when
+ * no race condition, which is good for reducing file numbers for HDFS.
+ */
 public class PooledHdfsShuffleWriteHandler implements ShuffleWriteHandler {
   private static final Logger LOGGER = LoggerFactory.getLogger(PooledHdfsShuffleWriteHandler.class);
 
-  private final BlockingQueue<HdfsShuffleWriteHandler> queue;
+  private final LinkedBlockingDeque<ShuffleWriteHandler> queue;
   private final int maxConcurrency;
   private final String basePath;
 
+  // Only for tests
+  @VisibleForTesting
+  public PooledHdfsShuffleWriteHandler(LinkedBlockingDeque<ShuffleWriteHandler> queue) {
+    this.queue = queue;
+    this.maxConcurrency =  queue.size();
+    this.basePath = StringUtils.EMPTY;
+  }
+
   public PooledHdfsShuffleWriteHandler(
       String appId,
       int shuffleId,
@@ -48,7 +65,7 @@ public class PooledHdfsShuffleWriteHandler implements ShuffleWriteHandler {
       int concurrency) {
     // todo: support max concurrency specified by client side
     this.maxConcurrency = concurrency;
-    this.queue = new LinkedBlockingQueue<>(maxConcurrency);
+    this.queue = new LinkedBlockingDeque<>(maxConcurrency);
     this.basePath = ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath,
         ShuffleStorageUtils.getShuffleDataPath(appId, shuffleId, startPartition, endPartition));
 
@@ -80,13 +97,13 @@ public class PooledHdfsShuffleWriteHandler implements ShuffleWriteHandler {
     if (queue.isEmpty()) {
       LOGGER.warn("No free hdfs writer handler, it will wait. storage path: {}", basePath);
     }
-    HdfsShuffleWriteHandler writeHandler = queue.take();
+    ShuffleWriteHandler writeHandler = queue.take();
     try {
       writeHandler.write(shuffleBlocks);
     } finally {
-      // Use add() here because we are sure the capacity will not be exceeded.
-      // Note: add() throws IllegalStateException when queue is full.
-      queue.add(writeHandler);
+      // Use addFirst() here because we are sure the capacity will not be exceeded.
+      // Note: addFirst() throws IllegalStateException when queue is full.
+      queue.addFirst(writeHandler);
     }
   }
 }
diff --git a/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandlerTest.java b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandlerTest.java
new file mode 100644
index 00000000..ca62b835
--- /dev/null
+++ b/storage/src/test/java/org/apache/uniffle/storage/handler/impl/PooledHdfsShuffleWriteHandlerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class PooledHdfsShuffleWriteHandlerTest {
+
+  static class FakedShuffleWriteHandler implements ShuffleWriteHandler {
+    private List<Integer> invokedList;
+    private int index;
+    private Runnable execution;
+
+    FakedShuffleWriteHandler(List<Integer> invokedList, int index, Runnable runnable) {
+      this.invokedList = invokedList;
+      this.index = index;
+      this.execution = runnable;
+    }
+
+    @Override
+    public void write(List<ShufflePartitionedBlock> shuffleBlocks) throws Exception {
+      execution.run();
+      invokedList.add(index);
+    }
+  }
+
+  @Test
+  public void writeSameFileWhenNoRaceCondition() throws Exception {
+    int concurrency = 5;
+    CopyOnWriteArrayList<Integer> invokedIndexes = new CopyOnWriteArrayList<>();
+    LinkedBlockingDeque deque = new LinkedBlockingDeque(concurrency);
+    for (int i = 0; i < concurrency; i++) {
+      deque.addFirst(
+          new FakedShuffleWriteHandler(invokedIndexes, i, () -> {
+            try {
+              Thread.sleep(100);
+            } catch (InterruptedException interruptedException) {
+              // ignore
+            }
+          })
+      );
+    }
+    PooledHdfsShuffleWriteHandler handler = new PooledHdfsShuffleWriteHandler(deque);
+
+    for (int i = 0; i < 10; i++) {
+      handler.write(Collections.emptyList());
+    }
+    assertEquals(10, invokedIndexes.size());
+    assertEquals(10, invokedIndexes.stream().filter(x -> x == 4).count());
+  }
+
+  @Test
+  public void concurrentWrite() throws InterruptedException {
+    int concurrency = 5;
+    CopyOnWriteArrayList<Integer> invokedIndexes = new CopyOnWriteArrayList<>();
+    LinkedBlockingDeque deque = new LinkedBlockingDeque(concurrency);
+    for (int i = 0; i < concurrency; i++) {
+      deque.addFirst(
+          new FakedShuffleWriteHandler(invokedIndexes, i, () -> {
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException interruptedException) {
+              // ignore
+            }
+          })
+      );
+    }
+    PooledHdfsShuffleWriteHandler handler = new PooledHdfsShuffleWriteHandler(deque);
+
+    ExecutorService executorService = Executors.newFixedThreadPool(concurrency);
+    for (int i = 0; i < concurrency; i++) {
+      executorService.submit(() -> {
+        try {
+          handler.write(Collections.emptyList());
+        } catch (Exception e) {
+          // ignore
+          e.printStackTrace();
+        }
+      });
+    }
+    Awaitility.await().timeout(2, TimeUnit.SECONDS).until(() -> invokedIndexes.size() == concurrency);
+    executorService.shutdownNow();
+  }
+}