You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/02/21 18:30:23 UTC

[2/4] hadoop git commit: HADOOP-13826. S3A Deadlock in multipart copy due to thread pool limits. Contributed by Sean Mackrory.

HADOOP-13826. S3A Deadlock in multipart copy due to thread pool limits. Contributed by  Sean Mackrory.

(cherry picked from commit e3a74e0369e6e2217d1280179b390227fe1b1684)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2158496f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2158496f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2158496f

Branch: refs/heads/trunk
Commit: 2158496f6bed5f9d14751b82bd5d43b9fd786b95
Parents: a07ddef
Author: Steve Loughran <st...@apache.org>
Authored: Tue Feb 21 17:54:43 2017 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Feb 21 18:28:49 2017 +0000

----------------------------------------------------------------------
 .../s3a/BlockingThreadPoolExecutorService.java  |   2 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |  21 ++-
 .../fs/s3a/scale/ITestS3AConcurrentOps.java     | 167 +++++++++++++++++++
 3 files changed, 184 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2158496f/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
index 5ff96a5..5b25730 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
@@ -86,7 +86,7 @@ final class BlockingThreadPoolExecutorService
    * @return a thread factory that creates named, daemon threads with
    * the supplied exception handler and normal priority
    */
-  private static ThreadFactory newDaemonThreadFactory(final String prefix) {
+  static ThreadFactory newDaemonThreadFactory(final String prefix) {
     final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
     return new ThreadFactory() {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2158496f/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index bffc210..8b1a6d0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -29,7 +29,10 @@ import java.util.Date;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -131,7 +134,8 @@ public class S3AFileSystem extends FileSystem {
   private long partSize;
   private boolean enableMultiObjectsDelete;
   private TransferManager transfers;
-  private ListeningExecutorService threadPoolExecutor;
+  private ListeningExecutorService boundedThreadPool;
+  private ExecutorService unboundedThreadPool;
   private long multiPartThreshold;
   public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
   private static final Logger PROGRESS =
@@ -216,11 +220,17 @@ public class S3AFileSystem extends FileSystem {
           MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
       long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
           DEFAULT_KEEPALIVE_TIME, 0);
-      threadPoolExecutor = BlockingThreadPoolExecutorService.newInstance(
+      boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
           maxThreads,
           maxThreads + totalTasks,
           keepAliveTime, TimeUnit.SECONDS,
           "s3a-transfer-shared");
+      unboundedThreadPool = new ThreadPoolExecutor(
+          maxThreads, Integer.MAX_VALUE,
+          keepAliveTime, TimeUnit.SECONDS,
+          new LinkedBlockingQueue<Runnable>(),
+          BlockingThreadPoolExecutorService.newDaemonThreadFactory(
+              "s3a-transfer-unbounded"));
 
       initTransferManager();
 
@@ -307,7 +317,7 @@ public class S3AFileSystem extends FileSystem {
     transferConfiguration.setMultipartCopyPartSize(partSize);
     transferConfiguration.setMultipartCopyThreshold(multiPartThreshold);
 
-    transfers = new TransferManager(s3, threadPoolExecutor);
+    transfers = new TransferManager(s3, unboundedThreadPool);
     transfers.setConfiguration(transferConfiguration);
   }
 
@@ -585,7 +595,7 @@ public class S3AFileSystem extends FileSystem {
       output = new FSDataOutputStream(
           new S3ABlockOutputStream(this,
               key,
-              new SemaphoredDelegatingExecutor(threadPoolExecutor,
+              new SemaphoredDelegatingExecutor(boundedThreadPool,
                   blockOutputActiveBlocks, true),
               progress,
               partSize,
@@ -2060,7 +2070,8 @@ public class S3AFileSystem extends FileSystem {
     if (blockFactory != null) {
       sb.append(", blockFactory=").append(blockFactory);
     }
-    sb.append(", executor=").append(threadPoolExecutor);
+    sb.append(", boundedExecutor=").append(boundedThreadPool);
+    sb.append(", unboundedExecutor=").append(unboundedThreadPool);
     sb.append(", statistics {")
         .append(statistics)
         .append("}");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2158496f/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
new file mode 100644
index 0000000..b4d3862
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import java.io.IOException;
+
+import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Tests concurrent operations on a single S3AFileSystem instance.
+ */
+public class ITestS3AConcurrentOps extends S3AScaleTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ITestS3AConcurrentOps.class);
+  private final int concurrentRenames = 10;
+  private Path testRoot;
+  private Path[] source = new Path[concurrentRenames];
+  private Path[] target = new Path[concurrentRenames];
+  private S3AFileSystem fs;
+  private S3AFileSystem auxFs;
+
+  @Override
+  protected int getTestTimeoutSeconds() {
+    return 16 * 60;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    fs = getRestrictedFileSystem();
+    auxFs = getNormalFileSystem();
+
+    testRoot = path("/ITestS3AConcurrentOps");
+    testRoot = S3ATestUtils.createTestPath(testRoot);
+
+    for (int i = 0; i < concurrentRenames; i++){
+      source[i] = new Path(testRoot, "source" + i);
+      target[i] = new Path(testRoot, "target" + i);
+    }
+
+    LOG.info("Generating data...");
+    auxFs.mkdirs(testRoot);
+    byte[] zeroes = ContractTestUtils.dataset(1024*1024, 0, Integer.MAX_VALUE);
+    for (Path aSource : source) {
+      try(FSDataOutputStream out = auxFs.create(aSource)) {
+        for (int mb = 0; mb < 20; mb++) {
+          LOG.debug("{}: Block {}...", aSource, mb);
+          out.write(zeroes);
+        }
+      }
+    }
+    LOG.info("Data generated...");
+  }
+
+  private S3AFileSystem getRestrictedFileSystem() throws Exception {
+    Configuration conf = getConfiguration();
+    conf.setInt(MAX_THREADS, 2);
+    conf.setInt(MAX_TOTAL_TASKS, 1);
+
+    conf.set(MIN_MULTIPART_THRESHOLD, "10M");
+    conf.set(MULTIPART_SIZE, "5M");
+
+    S3AFileSystem s3a = getFileSystem();
+    URI rootURI = new URI(conf.get(TEST_FS_S3A_NAME));
+    s3a.initialize(rootURI, conf);
+    return s3a;
+  }
+
+  private S3AFileSystem getNormalFileSystem() throws Exception {
+    S3AFileSystem s3a = new S3AFileSystem();
+    Configuration conf = new Configuration();
+    URI rootURI = new URI(conf.get(TEST_FS_S3A_NAME));
+    s3a.initialize(rootURI, conf);
+    return s3a;
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (auxFs != null) {
+      auxFs.delete(testRoot, true);
+    }
+  }
+
+  /**
+   * Attempts to trigger a deadlock that would happen if any bounded resource
+   * pool became saturated with control tasks that depended on other tasks
+   * that now can't enter the resource pool to get completed.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testParallelRename() throws InterruptedException,
+      ExecutionException, IOException {
+    ExecutorService executor = Executors.newFixedThreadPool(
+        concurrentRenames, new ThreadFactory() {
+          private AtomicInteger count = new AtomicInteger(0);
+
+          public Thread newThread(Runnable r) {
+            return new Thread(r,
+                "testParallelRename" + count.getAndIncrement());
+          }
+        });
+    ((ThreadPoolExecutor)executor).prestartAllCoreThreads();
+    Future<Boolean>[] futures = new Future[concurrentRenames];
+    for (int i = 0; i < concurrentRenames; i++) {
+      final int index = i;
+      futures[i] = executor.submit(new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          NanoTimer timer = new NanoTimer();
+          boolean result = fs.rename(source[index], target[index]);
+          timer.end("parallel rename %d", index);
+          LOG.info("Rename {} ran from {} to {}", index,
+              timer.getStartTime(), timer.getEndTime());
+          return result;
+        }
+      });
+    }
+    LOG.info("Waiting for tasks to complete...");
+    LOG.info("Deadlock may have occurred if nothing else is logged" +
+        " or the test times out");
+    for (int i = 0; i < concurrentRenames; i++) {
+      assertTrue("No future " + i, futures[i].get());
+      assertPathExists("target path", target[i]);
+      assertPathDoesNotExist("source path", source[i]);
+    }
+    LOG.info("All tasks have completed successfully");
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org