You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/02/21 18:30:22 UTC
[1/4] hadoop git commit: HADOOP-13826. S3A Deadlock in multipart copy
due to thread pool limits. Contributed by Sean Mackrory.
Repository: hadoop
Updated Branches:
refs/heads/branch-2 41d31588d -> 684365968
refs/heads/branch-2.8 10d54eb97 -> bb4b2f641
refs/heads/branch-2.8.0 f95cdefca -> 159b8b6ab
refs/heads/trunk a07ddef10 -> 2158496f6
HADOOP-13826. S3A Deadlock in multipart copy due to thread pool limits. Contributed by Sean Mackrory.
(cherry picked from commit e3a74e0369e6e2217d1280179b390227fe1b1684)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/159b8b6a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/159b8b6a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/159b8b6a
Branch: refs/heads/branch-2.8.0
Commit: 159b8b6ab8ba08604d266f774d0513a1fe9b2de0
Parents: f95cdef
Author: Steve Loughran <st...@apache.org>
Authored: Tue Feb 21 17:54:43 2017 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Feb 21 17:55:42 2017 +0000
----------------------------------------------------------------------
.../s3a/BlockingThreadPoolExecutorService.java | 2 +-
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 21 ++-
.../fs/s3a/scale/ITestS3AConcurrentOps.java | 167 +++++++++++++++++++
3 files changed, 184 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/159b8b6a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
index eb40c3a..f13942d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
@@ -86,7 +86,7 @@ final class BlockingThreadPoolExecutorService
* @return a thread factory that creates named, daemon threads with
* the supplied exception handler and normal priority
*/
- private static ThreadFactory newDaemonThreadFactory(final String prefix) {
+ static ThreadFactory newDaemonThreadFactory(final String prefix) {
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
return new ThreadFactory() {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/159b8b6a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 559aa66..2bfa0ea 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -29,7 +29,10 @@ import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -128,7 +131,8 @@ public class S3AFileSystem extends FileSystem {
private long partSize;
private boolean enableMultiObjectsDelete;
private TransferManager transfers;
- private ListeningExecutorService threadPoolExecutor;
+ private ListeningExecutorService boundedThreadPool;
+ private ExecutorService unboundedThreadPool;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
private static final Logger PROGRESS =
@@ -213,11 +217,17 @@ public class S3AFileSystem extends FileSystem {
MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
DEFAULT_KEEPALIVE_TIME, 0);
- threadPoolExecutor = BlockingThreadPoolExecutorService.newInstance(
+ boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
maxThreads,
maxThreads + totalTasks,
keepAliveTime, TimeUnit.SECONDS,
"s3a-transfer-shared");
+ unboundedThreadPool = new ThreadPoolExecutor(
+ maxThreads, Integer.MAX_VALUE,
+ keepAliveTime, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ BlockingThreadPoolExecutorService.newDaemonThreadFactory(
+ "s3a-transfer-unbounded"));
initTransferManager();
@@ -295,7 +305,7 @@ public class S3AFileSystem extends FileSystem {
transferConfiguration.setMultipartCopyPartSize(partSize);
transferConfiguration.setMultipartCopyThreshold(multiPartThreshold);
- transfers = new TransferManager(s3, threadPoolExecutor);
+ transfers = new TransferManager(s3, unboundedThreadPool);
transfers.setConfiguration(transferConfiguration);
}
@@ -564,7 +574,7 @@ public class S3AFileSystem extends FileSystem {
output = new FSDataOutputStream(
new S3ABlockOutputStream(this,
key,
- new SemaphoredDelegatingExecutor(threadPoolExecutor,
+ new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true),
progress,
partSize,
@@ -1955,7 +1965,8 @@ public class S3AFileSystem extends FileSystem {
if (blockFactory != null) {
sb.append(", blockFactory=").append(blockFactory);
}
- sb.append(", executor=").append(threadPoolExecutor);
+ sb.append(", boundedExecutor=").append(boundedThreadPool);
+ sb.append(", unboundedExecutor=").append(unboundedThreadPool);
sb.append(", statistics {")
.append(statistics)
.append("}");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/159b8b6a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
new file mode 100644
index 0000000..b4d3862
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import java.io.IOException;
+
+import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Tests concurrent operations on a single S3AFileSystem instance.
+ */
+public class ITestS3AConcurrentOps extends S3AScaleTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ITestS3AConcurrentOps.class);
+ private final int concurrentRenames = 10;
+ private Path testRoot;
+ private Path[] source = new Path[concurrentRenames];
+ private Path[] target = new Path[concurrentRenames];
+ private S3AFileSystem fs;
+ private S3AFileSystem auxFs;
+
+ @Override
+ protected int getTestTimeoutSeconds() {
+ return 16 * 60;
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ fs = getRestrictedFileSystem();
+ auxFs = getNormalFileSystem();
+
+ testRoot = path("/ITestS3AConcurrentOps");
+ testRoot = S3ATestUtils.createTestPath(testRoot);
+
+ for (int i = 0; i < concurrentRenames; i++){
+ source[i] = new Path(testRoot, "source" + i);
+ target[i] = new Path(testRoot, "target" + i);
+ }
+
+ LOG.info("Generating data...");
+ auxFs.mkdirs(testRoot);
+ byte[] zeroes = ContractTestUtils.dataset(1024*1024, 0, Integer.MAX_VALUE);
+ for (Path aSource : source) {
+ try(FSDataOutputStream out = auxFs.create(aSource)) {
+ for (int mb = 0; mb < 20; mb++) {
+ LOG.debug("{}: Block {}...", aSource, mb);
+ out.write(zeroes);
+ }
+ }
+ }
+ LOG.info("Data generated...");
+ }
+
+ private S3AFileSystem getRestrictedFileSystem() throws Exception {
+ Configuration conf = getConfiguration();
+ conf.setInt(MAX_THREADS, 2);
+ conf.setInt(MAX_TOTAL_TASKS, 1);
+
+ conf.set(MIN_MULTIPART_THRESHOLD, "10M");
+ conf.set(MULTIPART_SIZE, "5M");
+
+ S3AFileSystem s3a = getFileSystem();
+ URI rootURI = new URI(conf.get(TEST_FS_S3A_NAME));
+ s3a.initialize(rootURI, conf);
+ return s3a;
+ }
+
+ private S3AFileSystem getNormalFileSystem() throws Exception {
+ S3AFileSystem s3a = new S3AFileSystem();
+ Configuration conf = new Configuration();
+ URI rootURI = new URI(conf.get(TEST_FS_S3A_NAME));
+ s3a.initialize(rootURI, conf);
+ return s3a;
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (auxFs != null) {
+ auxFs.delete(testRoot, true);
+ }
+ }
+
+ /**
+ * Attempts to trigger a deadlock that would happen if any bounded resource
+ * pool became saturated with control tasks that depended on other tasks
+ * that now can't enter the resource pool to get completed.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testParallelRename() throws InterruptedException,
+ ExecutionException, IOException {
+ ExecutorService executor = Executors.newFixedThreadPool(
+ concurrentRenames, new ThreadFactory() {
+ private AtomicInteger count = new AtomicInteger(0);
+
+ public Thread newThread(Runnable r) {
+ return new Thread(r,
+ "testParallelRename" + count.getAndIncrement());
+ }
+ });
+ ((ThreadPoolExecutor)executor).prestartAllCoreThreads();
+ Future<Boolean>[] futures = new Future[concurrentRenames];
+ for (int i = 0; i < concurrentRenames; i++) {
+ final int index = i;
+ futures[i] = executor.submit(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ NanoTimer timer = new NanoTimer();
+ boolean result = fs.rename(source[index], target[index]);
+ timer.end("parallel rename %d", index);
+ LOG.info("Rename {} ran from {} to {}", index,
+ timer.getStartTime(), timer.getEndTime());
+ return result;
+ }
+ });
+ }
+ LOG.info("Waiting for tasks to complete...");
+ LOG.info("Deadlock may have occurred if nothing else is logged" +
+ " or the test times out");
+ for (int i = 0; i < concurrentRenames; i++) {
+ assertTrue("No future " + i, futures[i].get());
+ assertPathExists("target path", target[i]);
+ assertPathDoesNotExist("source path", source[i]);
+ }
+ LOG.info("All tasks have completed successfully");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[4/4] hadoop git commit: HADOOP-13826. S3A Deadlock in multipart copy
due to thread pool limits. Contributed by Sean Mackrory.
Posted by st...@apache.org.
HADOOP-13826. S3A Deadlock in multipart copy due to thread pool limits. Contributed by Sean Mackrory.
(cherry picked from commit e3a74e0369e6e2217d1280179b390227fe1b1684)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bb4b2f64
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bb4b2f64
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bb4b2f64
Branch: refs/heads/branch-2.8
Commit: bb4b2f641c72ffcb359d496ecc8f73c11f4a1ba1
Parents: 10d54eb
Author: Steve Loughran <st...@apache.org>
Authored: Tue Feb 21 17:54:43 2017 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Feb 21 18:29:46 2017 +0000
----------------------------------------------------------------------
.../s3a/BlockingThreadPoolExecutorService.java | 2 +-
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 21 ++-
.../fs/s3a/scale/ITestS3AConcurrentOps.java | 167 +++++++++++++++++++
3 files changed, 184 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb4b2f64/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
index eb40c3a..f13942d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
@@ -86,7 +86,7 @@ final class BlockingThreadPoolExecutorService
* @return a thread factory that creates named, daemon threads with
* the supplied exception handler and normal priority
*/
- private static ThreadFactory newDaemonThreadFactory(final String prefix) {
+ static ThreadFactory newDaemonThreadFactory(final String prefix) {
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
return new ThreadFactory() {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb4b2f64/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 559aa66..2bfa0ea 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -29,7 +29,10 @@ import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -128,7 +131,8 @@ public class S3AFileSystem extends FileSystem {
private long partSize;
private boolean enableMultiObjectsDelete;
private TransferManager transfers;
- private ListeningExecutorService threadPoolExecutor;
+ private ListeningExecutorService boundedThreadPool;
+ private ExecutorService unboundedThreadPool;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
private static final Logger PROGRESS =
@@ -213,11 +217,17 @@ public class S3AFileSystem extends FileSystem {
MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
DEFAULT_KEEPALIVE_TIME, 0);
- threadPoolExecutor = BlockingThreadPoolExecutorService.newInstance(
+ boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
maxThreads,
maxThreads + totalTasks,
keepAliveTime, TimeUnit.SECONDS,
"s3a-transfer-shared");
+ unboundedThreadPool = new ThreadPoolExecutor(
+ maxThreads, Integer.MAX_VALUE,
+ keepAliveTime, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ BlockingThreadPoolExecutorService.newDaemonThreadFactory(
+ "s3a-transfer-unbounded"));
initTransferManager();
@@ -295,7 +305,7 @@ public class S3AFileSystem extends FileSystem {
transferConfiguration.setMultipartCopyPartSize(partSize);
transferConfiguration.setMultipartCopyThreshold(multiPartThreshold);
- transfers = new TransferManager(s3, threadPoolExecutor);
+ transfers = new TransferManager(s3, unboundedThreadPool);
transfers.setConfiguration(transferConfiguration);
}
@@ -564,7 +574,7 @@ public class S3AFileSystem extends FileSystem {
output = new FSDataOutputStream(
new S3ABlockOutputStream(this,
key,
- new SemaphoredDelegatingExecutor(threadPoolExecutor,
+ new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true),
progress,
partSize,
@@ -1955,7 +1965,8 @@ public class S3AFileSystem extends FileSystem {
if (blockFactory != null) {
sb.append(", blockFactory=").append(blockFactory);
}
- sb.append(", executor=").append(threadPoolExecutor);
+ sb.append(", boundedExecutor=").append(boundedThreadPool);
+ sb.append(", unboundedExecutor=").append(unboundedThreadPool);
sb.append(", statistics {")
.append(statistics)
.append("}");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb4b2f64/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
new file mode 100644
index 0000000..b4d3862
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import java.io.IOException;
+
+import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Tests concurrent operations on a single S3AFileSystem instance.
+ */
+public class ITestS3AConcurrentOps extends S3AScaleTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ITestS3AConcurrentOps.class);
+ private final int concurrentRenames = 10;
+ private Path testRoot;
+ private Path[] source = new Path[concurrentRenames];
+ private Path[] target = new Path[concurrentRenames];
+ private S3AFileSystem fs;
+ private S3AFileSystem auxFs;
+
+ @Override
+ protected int getTestTimeoutSeconds() {
+ return 16 * 60;
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ fs = getRestrictedFileSystem();
+ auxFs = getNormalFileSystem();
+
+ testRoot = path("/ITestS3AConcurrentOps");
+ testRoot = S3ATestUtils.createTestPath(testRoot);
+
+ for (int i = 0; i < concurrentRenames; i++){
+ source[i] = new Path(testRoot, "source" + i);
+ target[i] = new Path(testRoot, "target" + i);
+ }
+
+ LOG.info("Generating data...");
+ auxFs.mkdirs(testRoot);
+ byte[] zeroes = ContractTestUtils.dataset(1024*1024, 0, Integer.MAX_VALUE);
+ for (Path aSource : source) {
+ try(FSDataOutputStream out = auxFs.create(aSource)) {
+ for (int mb = 0; mb < 20; mb++) {
+ LOG.debug("{}: Block {}...", aSource, mb);
+ out.write(zeroes);
+ }
+ }
+ }
+ LOG.info("Data generated...");
+ }
+
+ private S3AFileSystem getRestrictedFileSystem() throws Exception {
+ Configuration conf = getConfiguration();
+ conf.setInt(MAX_THREADS, 2);
+ conf.setInt(MAX_TOTAL_TASKS, 1);
+
+ conf.set(MIN_MULTIPART_THRESHOLD, "10M");
+ conf.set(MULTIPART_SIZE, "5M");
+
+ S3AFileSystem s3a = getFileSystem();
+ URI rootURI = new URI(conf.get(TEST_FS_S3A_NAME));
+ s3a.initialize(rootURI, conf);
+ return s3a;
+ }
+
+ private S3AFileSystem getNormalFileSystem() throws Exception {
+ S3AFileSystem s3a = new S3AFileSystem();
+ Configuration conf = new Configuration();
+ URI rootURI = new URI(conf.get(TEST_FS_S3A_NAME));
+ s3a.initialize(rootURI, conf);
+ return s3a;
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (auxFs != null) {
+ auxFs.delete(testRoot, true);
+ }
+ }
+
+ /**
+ * Attempts to trigger a deadlock that would happen if any bounded resource
+ * pool became saturated with control tasks that depended on other tasks
+ * that now can't enter the resource pool to get completed.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testParallelRename() throws InterruptedException,
+ ExecutionException, IOException {
+ ExecutorService executor = Executors.newFixedThreadPool(
+ concurrentRenames, new ThreadFactory() {
+ private AtomicInteger count = new AtomicInteger(0);
+
+ public Thread newThread(Runnable r) {
+ return new Thread(r,
+ "testParallelRename" + count.getAndIncrement());
+ }
+ });
+ ((ThreadPoolExecutor)executor).prestartAllCoreThreads();
+ Future<Boolean>[] futures = new Future[concurrentRenames];
+ for (int i = 0; i < concurrentRenames; i++) {
+ final int index = i;
+ futures[i] = executor.submit(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ NanoTimer timer = new NanoTimer();
+ boolean result = fs.rename(source[index], target[index]);
+ timer.end("parallel rename %d", index);
+ LOG.info("Rename {} ran from {} to {}", index,
+ timer.getStartTime(), timer.getEndTime());
+ return result;
+ }
+ });
+ }
+ LOG.info("Waiting for tasks to complete...");
+ LOG.info("Deadlock may have occurred if nothing else is logged" +
+ " or the test times out");
+ for (int i = 0; i < concurrentRenames; i++) {
+ assertTrue("No future " + i, futures[i].get());
+ assertPathExists("target path", target[i]);
+ assertPathDoesNotExist("source path", source[i]);
+ }
+ LOG.info("All tasks have completed successfully");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[3/4] hadoop git commit: HADOOP-13826. S3A Deadlock in multipart copy
due to thread pool limits. Contributed by Sean Mackrory.
Posted by st...@apache.org.
HADOOP-13826. S3A Deadlock in multipart copy due to thread pool limits. Contributed by Sean Mackrory.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/68436596
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/68436596
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/68436596
Branch: refs/heads/branch-2
Commit: 6843659689e226d961d0adb5f475ca7babb2723c
Parents: 41d3158
Author: Steve Loughran <st...@apache.org>
Authored: Tue Feb 21 17:54:43 2017 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Feb 21 18:29:30 2017 +0000
----------------------------------------------------------------------
.../s3a/BlockingThreadPoolExecutorService.java | 2 +-
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 21 ++-
.../fs/s3a/scale/ITestS3AConcurrentOps.java | 167 +++++++++++++++++++
3 files changed, 184 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68436596/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
index eb40c3a..f13942d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
@@ -86,7 +86,7 @@ final class BlockingThreadPoolExecutorService
* @return a thread factory that creates named, daemon threads with
* the supplied exception handler and normal priority
*/
- private static ThreadFactory newDaemonThreadFactory(final String prefix) {
+ static ThreadFactory newDaemonThreadFactory(final String prefix) {
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
return new ThreadFactory() {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68436596/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index e5e4118..75ae414 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -29,7 +29,10 @@ import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -131,7 +134,8 @@ public class S3AFileSystem extends FileSystem {
private long partSize;
private boolean enableMultiObjectsDelete;
private TransferManager transfers;
- private ListeningExecutorService threadPoolExecutor;
+ private ListeningExecutorService boundedThreadPool;
+ private ExecutorService unboundedThreadPool;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
private static final Logger PROGRESS =
@@ -216,11 +220,17 @@ public class S3AFileSystem extends FileSystem {
MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
DEFAULT_KEEPALIVE_TIME, 0);
- threadPoolExecutor = BlockingThreadPoolExecutorService.newInstance(
+ boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
maxThreads,
maxThreads + totalTasks,
keepAliveTime, TimeUnit.SECONDS,
"s3a-transfer-shared");
+ unboundedThreadPool = new ThreadPoolExecutor(
+ maxThreads, Integer.MAX_VALUE,
+ keepAliveTime, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ BlockingThreadPoolExecutorService.newDaemonThreadFactory(
+ "s3a-transfer-unbounded"));
initTransferManager();
@@ -307,7 +317,7 @@ public class S3AFileSystem extends FileSystem {
transferConfiguration.setMultipartCopyPartSize(partSize);
transferConfiguration.setMultipartCopyThreshold(multiPartThreshold);
- transfers = new TransferManager(s3, threadPoolExecutor);
+ transfers = new TransferManager(s3, unboundedThreadPool);
transfers.setConfiguration(transferConfiguration);
}
@@ -585,7 +595,7 @@ public class S3AFileSystem extends FileSystem {
output = new FSDataOutputStream(
new S3ABlockOutputStream(this,
key,
- new SemaphoredDelegatingExecutor(threadPoolExecutor,
+ new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true),
progress,
partSize,
@@ -2059,7 +2069,8 @@ public class S3AFileSystem extends FileSystem {
if (blockFactory != null) {
sb.append(", blockFactory=").append(blockFactory);
}
- sb.append(", executor=").append(threadPoolExecutor);
+ sb.append(", boundedExecutor=").append(boundedThreadPool);
+ sb.append(", unboundedExecutor=").append(unboundedThreadPool);
sb.append(", statistics {")
.append(statistics)
.append("}");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/68436596/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
new file mode 100644
index 0000000..b4d3862
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import java.io.IOException;
+
+import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Tests concurrent operations on a single S3AFileSystem instance.
+ */
+public class ITestS3AConcurrentOps extends S3AScaleTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ITestS3AConcurrentOps.class);
+ private final int concurrentRenames = 10;
+ private Path testRoot;
+ private Path[] source = new Path[concurrentRenames];
+ private Path[] target = new Path[concurrentRenames];
+ private S3AFileSystem fs;
+ private S3AFileSystem auxFs;
+
+ @Override
+ protected int getTestTimeoutSeconds() {
+ return 16 * 60;
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ fs = getRestrictedFileSystem();
+ auxFs = getNormalFileSystem();
+
+ testRoot = path("/ITestS3AConcurrentOps");
+ testRoot = S3ATestUtils.createTestPath(testRoot);
+
+ for (int i = 0; i < concurrentRenames; i++){
+ source[i] = new Path(testRoot, "source" + i);
+ target[i] = new Path(testRoot, "target" + i);
+ }
+
+ LOG.info("Generating data...");
+ auxFs.mkdirs(testRoot);
+ byte[] zeroes = ContractTestUtils.dataset(1024*1024, 0, Integer.MAX_VALUE);
+ for (Path aSource : source) {
+ try(FSDataOutputStream out = auxFs.create(aSource)) {
+ for (int mb = 0; mb < 20; mb++) {
+ LOG.debug("{}: Block {}...", aSource, mb);
+ out.write(zeroes);
+ }
+ }
+ }
+ LOG.info("Data generated...");
+ }
+
+ private S3AFileSystem getRestrictedFileSystem() throws Exception {
+ Configuration conf = getConfiguration();
+ conf.setInt(MAX_THREADS, 2);
+ conf.setInt(MAX_TOTAL_TASKS, 1);
+
+ conf.set(MIN_MULTIPART_THRESHOLD, "10M");
+ conf.set(MULTIPART_SIZE, "5M");
+
+ S3AFileSystem s3a = getFileSystem();
+ URI rootURI = new URI(conf.get(TEST_FS_S3A_NAME));
+ s3a.initialize(rootURI, conf);
+ return s3a;
+ }
+
+ private S3AFileSystem getNormalFileSystem() throws Exception {
+ S3AFileSystem s3a = new S3AFileSystem();
+ Configuration conf = new Configuration();
+ URI rootURI = new URI(conf.get(TEST_FS_S3A_NAME));
+ s3a.initialize(rootURI, conf);
+ return s3a;
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (auxFs != null) {
+ auxFs.delete(testRoot, true);
+ }
+ }
+
+ /**
+ * Attempts to trigger a deadlock that would happen if any bounded resource
+ * pool became saturated with control tasks that depended on other tasks
+ * that now can't enter the resource pool to get completed.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testParallelRename() throws InterruptedException,
+ ExecutionException, IOException {
+ ExecutorService executor = Executors.newFixedThreadPool(
+ concurrentRenames, new ThreadFactory() {
+ private AtomicInteger count = new AtomicInteger(0);
+
+ public Thread newThread(Runnable r) {
+ return new Thread(r,
+ "testParallelRename" + count.getAndIncrement());
+ }
+ });
+ ((ThreadPoolExecutor)executor).prestartAllCoreThreads();
+ Future<Boolean>[] futures = new Future[concurrentRenames];
+ for (int i = 0; i < concurrentRenames; i++) {
+ final int index = i;
+ futures[i] = executor.submit(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ NanoTimer timer = new NanoTimer();
+ boolean result = fs.rename(source[index], target[index]);
+ timer.end("parallel rename %d", index);
+ LOG.info("Rename {} ran from {} to {}", index,
+ timer.getStartTime(), timer.getEndTime());
+ return result;
+ }
+ });
+ }
+ LOG.info("Waiting for tasks to complete...");
+ LOG.info("Deadlock may have occurred if nothing else is logged" +
+ " or the test times out");
+ for (int i = 0; i < concurrentRenames; i++) {
+ assertTrue("No future " + i, futures[i].get());
+ assertPathExists("target path", target[i]);
+ assertPathDoesNotExist("source path", source[i]);
+ }
+ LOG.info("All tasks have completed successfully");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/4] hadoop git commit: HADOOP-13826. S3A Deadlock in multipart copy
due to thread pool limits. Contributed by Sean Mackrory.
Posted by st...@apache.org.
HADOOP-13826. S3A Deadlock in multipart copy due to thread pool limits. Contributed by Sean Mackrory.
(cherry picked from commit e3a74e0369e6e2217d1280179b390227fe1b1684)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2158496f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2158496f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2158496f
Branch: refs/heads/trunk
Commit: 2158496f6bed5f9d14751b82bd5d43b9fd786b95
Parents: a07ddef
Author: Steve Loughran <st...@apache.org>
Authored: Tue Feb 21 17:54:43 2017 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Tue Feb 21 18:28:49 2017 +0000
----------------------------------------------------------------------
.../s3a/BlockingThreadPoolExecutorService.java | 2 +-
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 21 ++-
.../fs/s3a/scale/ITestS3AConcurrentOps.java | 167 +++++++++++++++++++
3 files changed, 184 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2158496f/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
index 5ff96a5..5b25730 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
@@ -86,7 +86,7 @@ final class BlockingThreadPoolExecutorService
* @return a thread factory that creates named, daemon threads with
* the supplied exception handler and normal priority
*/
- private static ThreadFactory newDaemonThreadFactory(final String prefix) {
+ static ThreadFactory newDaemonThreadFactory(final String prefix) {
final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
return new ThreadFactory() {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2158496f/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index bffc210..8b1a6d0 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -29,7 +29,10 @@ import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.Objects;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -131,7 +134,8 @@ public class S3AFileSystem extends FileSystem {
private long partSize;
private boolean enableMultiObjectsDelete;
private TransferManager transfers;
- private ListeningExecutorService threadPoolExecutor;
+ private ListeningExecutorService boundedThreadPool;
+ private ExecutorService unboundedThreadPool;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
private static final Logger PROGRESS =
@@ -216,11 +220,17 @@ public class S3AFileSystem extends FileSystem {
MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
DEFAULT_KEEPALIVE_TIME, 0);
- threadPoolExecutor = BlockingThreadPoolExecutorService.newInstance(
+ boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
maxThreads,
maxThreads + totalTasks,
keepAliveTime, TimeUnit.SECONDS,
"s3a-transfer-shared");
+ unboundedThreadPool = new ThreadPoolExecutor(
+ maxThreads, Integer.MAX_VALUE,
+ keepAliveTime, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ BlockingThreadPoolExecutorService.newDaemonThreadFactory(
+ "s3a-transfer-unbounded"));
initTransferManager();
@@ -307,7 +317,7 @@ public class S3AFileSystem extends FileSystem {
transferConfiguration.setMultipartCopyPartSize(partSize);
transferConfiguration.setMultipartCopyThreshold(multiPartThreshold);
- transfers = new TransferManager(s3, threadPoolExecutor);
+ transfers = new TransferManager(s3, unboundedThreadPool);
transfers.setConfiguration(transferConfiguration);
}
@@ -585,7 +595,7 @@ public class S3AFileSystem extends FileSystem {
output = new FSDataOutputStream(
new S3ABlockOutputStream(this,
key,
- new SemaphoredDelegatingExecutor(threadPoolExecutor,
+ new SemaphoredDelegatingExecutor(boundedThreadPool,
blockOutputActiveBlocks, true),
progress,
partSize,
@@ -2060,7 +2070,8 @@ public class S3AFileSystem extends FileSystem {
if (blockFactory != null) {
sb.append(", blockFactory=").append(blockFactory);
}
- sb.append(", executor=").append(threadPoolExecutor);
+ sb.append(", boundedExecutor=").append(boundedThreadPool);
+ sb.append(", unboundedExecutor=").append(unboundedThreadPool);
sb.append(", statistics {")
.append(statistics)
.append("}");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2158496f/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
new file mode 100644
index 0000000..b4d3862
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import java.io.IOException;
+
+import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
+
+import org.junit.After;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Tests concurrent operations on a single S3AFileSystem instance.
+ */
+public class ITestS3AConcurrentOps extends S3AScaleTestBase {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ITestS3AConcurrentOps.class);
+ private final int concurrentRenames = 10;
+ private Path testRoot;
+ private Path[] source = new Path[concurrentRenames];
+ private Path[] target = new Path[concurrentRenames];
+ private S3AFileSystem fs;
+ private S3AFileSystem auxFs;
+
+ @Override
+ protected int getTestTimeoutSeconds() {
+ return 16 * 60;
+ }
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ fs = getRestrictedFileSystem();
+ auxFs = getNormalFileSystem();
+
+ testRoot = path("/ITestS3AConcurrentOps");
+ testRoot = S3ATestUtils.createTestPath(testRoot);
+
+ for (int i = 0; i < concurrentRenames; i++){
+ source[i] = new Path(testRoot, "source" + i);
+ target[i] = new Path(testRoot, "target" + i);
+ }
+
+ LOG.info("Generating data...");
+ auxFs.mkdirs(testRoot);
+ byte[] zeroes = ContractTestUtils.dataset(1024*1024, 0, Integer.MAX_VALUE);
+ for (Path aSource : source) {
+ try(FSDataOutputStream out = auxFs.create(aSource)) {
+ for (int mb = 0; mb < 20; mb++) {
+ LOG.debug("{}: Block {}...", aSource, mb);
+ out.write(zeroes);
+ }
+ }
+ }
+ LOG.info("Data generated...");
+ }
+
+ private S3AFileSystem getRestrictedFileSystem() throws Exception {
+ Configuration conf = getConfiguration();
+ conf.setInt(MAX_THREADS, 2);
+ conf.setInt(MAX_TOTAL_TASKS, 1);
+
+ conf.set(MIN_MULTIPART_THRESHOLD, "10M");
+ conf.set(MULTIPART_SIZE, "5M");
+
+ S3AFileSystem s3a = getFileSystem();
+ URI rootURI = new URI(conf.get(TEST_FS_S3A_NAME));
+ s3a.initialize(rootURI, conf);
+ return s3a;
+ }
+
+ private S3AFileSystem getNormalFileSystem() throws Exception {
+ S3AFileSystem s3a = new S3AFileSystem();
+ Configuration conf = new Configuration();
+ URI rootURI = new URI(conf.get(TEST_FS_S3A_NAME));
+ s3a.initialize(rootURI, conf);
+ return s3a;
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (auxFs != null) {
+ auxFs.delete(testRoot, true);
+ }
+ }
+
+ /**
+ * Attempts to trigger a deadlock that would happen if any bounded resource
+ * pool became saturated with control tasks that depended on other tasks
+ * that now can't enter the resource pool to get completed.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testParallelRename() throws InterruptedException,
+ ExecutionException, IOException {
+ ExecutorService executor = Executors.newFixedThreadPool(
+ concurrentRenames, new ThreadFactory() {
+ private AtomicInteger count = new AtomicInteger(0);
+
+ public Thread newThread(Runnable r) {
+ return new Thread(r,
+ "testParallelRename" + count.getAndIncrement());
+ }
+ });
+ ((ThreadPoolExecutor)executor).prestartAllCoreThreads();
+ Future<Boolean>[] futures = new Future[concurrentRenames];
+ for (int i = 0; i < concurrentRenames; i++) {
+ final int index = i;
+ futures[i] = executor.submit(new Callable<Boolean>() {
+ @Override
+ public Boolean call() throws Exception {
+ NanoTimer timer = new NanoTimer();
+ boolean result = fs.rename(source[index], target[index]);
+ timer.end("parallel rename %d", index);
+ LOG.info("Rename {} ran from {} to {}", index,
+ timer.getStartTime(), timer.getEndTime());
+ return result;
+ }
+ });
+ }
+ LOG.info("Waiting for tasks to complete...");
+ LOG.info("Deadlock may have occurred if nothing else is logged" +
+ " or the test times out");
+ for (int i = 0; i < concurrentRenames; i++) {
+ assertTrue("No future " + i, futures[i].get());
+ assertPathExists("target path", target[i]);
+ assertPathDoesNotExist("source path", source[i]);
+ }
+ LOG.info("All tasks have completed successfully");
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org