You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2018/01/28 05:00:48 UTC
[3/3] hadoop git commit: HADOOP-15039/HADOOP-15189. Move
SemaphoredDelegatingExecutor to hadoop-common Contributed by Genmao Yu
HADOOP-15039/HADOOP-15189. Move SemaphoredDelegatingExecutor to hadoop-common
Contributed by Genmao Yu
(cherry picked from commit 312c5716943b31f0f6363f6df02523f959eb981e)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d9875b04
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d9875b04
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d9875b04
Branch: refs/heads/branch-2.9
Commit: d9875b046fe0371fd6a3d54e285d216447c59a08
Parents: d69df0b
Author: Steve Loughran <st...@apache.org>
Authored: Sat Jan 27 21:00:25 2018 -0800
Committer: Steve Loughran <st...@apache.org>
Committed: Sat Jan 27 21:00:25 2018 -0800
----------------------------------------------------------------------
.../util/BlockingThreadPoolExecutorService.java | 170 ++++++++++++++
.../util/SemaphoredDelegatingExecutor.java | 227 ++++++++++++++++++
.../s3a/BlockingThreadPoolExecutorService.java | 170 --------------
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 2 +
.../fs/s3a/SemaphoredDelegatingExecutor.java | 230 -------------------
.../ITestBlockingThreadPoolExecutorService.java | 2 +
6 files changed, 401 insertions(+), 400 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9875b04/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java
new file mode 100644
index 0000000..79128de
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * This ExecutorService blocks the submission of new tasks when its queue is
+ * already full by using a semaphore. Task submissions require permits, task
+ * completions release permits.
+ * <p>
+ * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
+ * this s4 threadpool</a>
+ */
+@InterfaceAudience.Private
+public final class BlockingThreadPoolExecutorService
+ extends SemaphoredDelegatingExecutor {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(BlockingThreadPoolExecutorService.class);
+
+ private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);
+
+ private final ThreadPoolExecutor eventProcessingExecutor;
+
+ /**
+ * Returns a {@link java.util.concurrent.ThreadFactory} that names each
+ * created thread uniquely,
+ * with a common prefix.
+ *
+ * @param prefix The prefix of every created Thread's name
+ * @return a {@link java.util.concurrent.ThreadFactory} that names threads
+ */
+ static ThreadFactory getNamedThreadFactory(final String prefix) {
+ SecurityManager s = System.getSecurityManager();
+ final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() :
+ Thread.currentThread().getThreadGroup();
+
+ return new ThreadFactory() {
+ private final AtomicInteger threadNumber = new AtomicInteger(1);
+ private final int poolNum = POOLNUMBER.getAndIncrement();
+ private final ThreadGroup group = threadGroup;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ final String name =
+ prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
+ return new Thread(group, r, name);
+ }
+ };
+ }
+
+ /**
+ * Get a named {@link ThreadFactory} that just builds daemon threads.
+ *
+ * @param prefix name prefix for all threads created from the factory
+ * @return a thread factory that creates named, daemon threads with
+ * the supplied exception handler and normal priority
+ */
+ public static ThreadFactory newDaemonThreadFactory(final String prefix) {
+ final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
+ return new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread t = namedFactory.newThread(r);
+ if (!t.isDaemon()) {
+ t.setDaemon(true);
+ }
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ return t;
+ }
+
+ };
+ }
+
+ private BlockingThreadPoolExecutorService(int permitCount,
+ ThreadPoolExecutor eventProcessingExecutor) {
+ super(MoreExecutors.listeningDecorator(eventProcessingExecutor),
+ permitCount, false);
+ this.eventProcessingExecutor = eventProcessingExecutor;
+ }
+
+ /**
+ * A thread pool that that blocks clients submitting additional tasks if
+ * there are already {@code activeTasks} running threads and {@code
+ * waitingTasks} tasks waiting in its queue.
+ *
+ * @param activeTasks maximum number of active tasks
+ * @param waitingTasks maximum number of waiting tasks
+ * @param keepAliveTime time until threads are cleaned up in {@code unit}
+ * @param unit time unit
+ * @param prefixName prefix of name for threads
+ */
+ public static BlockingThreadPoolExecutorService newInstance(
+ int activeTasks,
+ int waitingTasks,
+ long keepAliveTime, TimeUnit unit,
+ String prefixName) {
+
+ /* Although we generally only expect up to waitingTasks tasks in the
+ queue, we need to be able to buffer all tasks in case dequeueing is
+ slower than enqueueing. */
+ final BlockingQueue<Runnable> workQueue =
+ new LinkedBlockingQueue<>(waitingTasks + activeTasks);
+ ThreadPoolExecutor eventProcessingExecutor =
+ new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
+ workQueue, newDaemonThreadFactory(prefixName),
+ new RejectedExecutionHandler() {
+ @Override
+ public void rejectedExecution(Runnable r,
+ ThreadPoolExecutor executor) {
+ // This is not expected to happen.
+ LOG.error("Could not submit task to executor {}",
+ executor.toString());
+ }
+ });
+ eventProcessingExecutor.allowCoreThreadTimeOut(true);
+ return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
+ eventProcessingExecutor);
+ }
+
+ /**
+ * Get the actual number of active threads.
+ * @return the active thread count
+ */
+ int getActiveCount() {
+ return eventProcessingExecutor.getActiveCount();
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "BlockingThreadPoolExecutorService{");
+ sb.append(super.toString());
+ sb.append(", activeCount=").append(getActiveCount());
+ sb.append('}');
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9875b04/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java
new file mode 100644
index 0000000..7a1e33a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This ExecutorService blocks the submission of new tasks when its queue is
+ * already full by using a semaphore. Task submissions require permits, task
+ * completions release permits.
+ * <p>
+ * This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code
+ * contains the thread pool logic, whereas this isolates the semaphore
+ * and submit logic for use with other thread pools and delegation models.
+ * <p>
+ * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
+ * this s4 threadpool</a>
+ */
+@SuppressWarnings("NullableProblems")
+@InterfaceAudience.Private
+public class SemaphoredDelegatingExecutor extends
+ ForwardingListeningExecutorService {
+
+ private final Semaphore queueingPermits;
+ private final ListeningExecutorService executorDelegatee;
+ private final int permitCount;
+
+ /**
+ * Instantiate.
+ * @param executorDelegatee Executor to delegate to
+ * @param permitCount number of permits into the queue permitted
+ * @param fair should the semaphore be "fair"
+ */
+ public SemaphoredDelegatingExecutor(
+ ListeningExecutorService executorDelegatee,
+ int permitCount,
+ boolean fair) {
+ this.permitCount = permitCount;
+ queueingPermits = new Semaphore(permitCount, fair);
+ this.executorDelegatee = executorDelegatee;
+ }
+
+ @Override
+ protected ListeningExecutorService delegate() {
+ return executorDelegatee;
+ }
+
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+ long timeout, TimeUnit unit) throws InterruptedException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
+ TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Callable<T> task) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
+ }
+ return super.submit(new CallableWithPermitRelease<>(task));
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Runnable task, T result) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
+ }
+ return super.submit(new RunnableWithPermitRelease(task), result);
+ }
+
+ @Override
+ public ListenableFuture<?> submit(Runnable task) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return Futures.immediateFailedCheckedFuture(e);
+ }
+ return super.submit(new RunnableWithPermitRelease(task));
+ }
+
+ @Override
+ public void execute(Runnable command) {
+ try {
+ queueingPermits.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ super.execute(new RunnableWithPermitRelease(command));
+ }
+
+ /**
+ * Get the number of permits available; guaranteed to be
+ * {@code 0 <= availablePermits <= size}.
+ * @return the number of permits available at the time of invocation.
+ */
+ public int getAvailablePermits() {
+ return queueingPermits.availablePermits();
+ }
+
+ /**
+ * Get the number of threads waiting to acquire a permit.
+ * @return snapshot of the length of the queue of blocked threads.
+ */
+ public int getWaitingCount() {
+ return queueingPermits.getQueueLength();
+ }
+
+ /**
+ * Total number of permits.
+ * @return the number of permits as set in the constructor
+ */
+ public int getPermitCount() {
+ return permitCount;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "SemaphoredDelegatingExecutor{");
+ sb.append("permitCount=").append(getPermitCount());
+ sb.append(", available=").append(getAvailablePermits());
+ sb.append(", waiting=").append(getWaitingCount());
+ sb.append('}');
+ return sb.toString();
+ }
+
+ /**
+ * Releases a permit after the task is executed.
+ */
+ class RunnableWithPermitRelease implements Runnable {
+
+ private Runnable delegatee;
+
+ RunnableWithPermitRelease(Runnable delegatee) {
+ this.delegatee = delegatee;
+ }
+
+ @Override
+ public void run() {
+ try {
+ delegatee.run();
+ } finally {
+ queueingPermits.release();
+ }
+
+ }
+ }
+
+ /**
+ * Releases a permit after the task is completed.
+ */
+ class CallableWithPermitRelease<T> implements Callable<T> {
+
+ private Callable<T> delegatee;
+
+ CallableWithPermitRelease(Callable<T> delegatee) {
+ this.delegatee = delegatee;
+ }
+
+ @Override
+ public T call() throws Exception {
+ try {
+ return delegatee.call();
+ } finally {
+ queueingPermits.release();
+ }
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9875b04/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
deleted file mode 100644
index f13942d..0000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * This ExecutorService blocks the submission of new tasks when its queue is
- * already full by using a semaphore. Task submissions require permits, task
- * completions release permits.
- * <p>
- * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
- * this s4 threadpool</a>
- */
-@InterfaceAudience.Private
-final class BlockingThreadPoolExecutorService
- extends SemaphoredDelegatingExecutor {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(BlockingThreadPoolExecutorService.class);
-
- private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);
-
- private final ThreadPoolExecutor eventProcessingExecutor;
-
- /**
- * Returns a {@link java.util.concurrent.ThreadFactory} that names each
- * created thread uniquely,
- * with a common prefix.
- *
- * @param prefix The prefix of every created Thread's name
- * @return a {@link java.util.concurrent.ThreadFactory} that names threads
- */
- static ThreadFactory getNamedThreadFactory(final String prefix) {
- SecurityManager s = System.getSecurityManager();
- final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() :
- Thread.currentThread().getThreadGroup();
-
- return new ThreadFactory() {
- private final AtomicInteger threadNumber = new AtomicInteger(1);
- private final int poolNum = POOLNUMBER.getAndIncrement();
- private final ThreadGroup group = threadGroup;
-
- @Override
- public Thread newThread(Runnable r) {
- final String name =
- prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
- return new Thread(group, r, name);
- }
- };
- }
-
- /**
- * Get a named {@link ThreadFactory} that just builds daemon threads.
- *
- * @param prefix name prefix for all threads created from the factory
- * @return a thread factory that creates named, daemon threads with
- * the supplied exception handler and normal priority
- */
- static ThreadFactory newDaemonThreadFactory(final String prefix) {
- final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
- return new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- Thread t = namedFactory.newThread(r);
- if (!t.isDaemon()) {
- t.setDaemon(true);
- }
- if (t.getPriority() != Thread.NORM_PRIORITY) {
- t.setPriority(Thread.NORM_PRIORITY);
- }
- return t;
- }
-
- };
- }
-
- private BlockingThreadPoolExecutorService(int permitCount,
- ThreadPoolExecutor eventProcessingExecutor) {
- super(MoreExecutors.listeningDecorator(eventProcessingExecutor),
- permitCount, false);
- this.eventProcessingExecutor = eventProcessingExecutor;
- }
-
- /**
- * A thread pool that that blocks clients submitting additional tasks if
- * there are already {@code activeTasks} running threads and {@code
- * waitingTasks} tasks waiting in its queue.
- *
- * @param activeTasks maximum number of active tasks
- * @param waitingTasks maximum number of waiting tasks
- * @param keepAliveTime time until threads are cleaned up in {@code unit}
- * @param unit time unit
- * @param prefixName prefix of name for threads
- */
- public static BlockingThreadPoolExecutorService newInstance(
- int activeTasks,
- int waitingTasks,
- long keepAliveTime, TimeUnit unit,
- String prefixName) {
-
- /* Although we generally only expect up to waitingTasks tasks in the
- queue, we need to be able to buffer all tasks in case dequeueing is
- slower than enqueueing. */
- final BlockingQueue<Runnable> workQueue =
- new LinkedBlockingQueue<>(waitingTasks + activeTasks);
- ThreadPoolExecutor eventProcessingExecutor =
- new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
- workQueue, newDaemonThreadFactory(prefixName),
- new RejectedExecutionHandler() {
- @Override
- public void rejectedExecution(Runnable r,
- ThreadPoolExecutor executor) {
- // This is not expected to happen.
- LOG.error("Could not submit task to executor {}",
- executor.toString());
- }
- });
- eventProcessingExecutor.allowCoreThreadTimeOut(true);
- return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
- eventProcessingExecutor);
- }
-
- /**
- * Get the actual number of active threads.
- * @return the active thread count
- */
- int getActiveCount() {
- return eventProcessingExecutor.getActiveCount();
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder(
- "BlockingThreadPoolExecutorService{");
- sb.append(super.toString());
- sb.append(", activeCount=").append(getActiveCount());
- sb.append('}');
- return sb.toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9875b04/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 6d38fe6..f7068c4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -103,8 +103,10 @@ import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9875b04/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
deleted file mode 100644
index 6b21912..0000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import com.google.common.util.concurrent.ForwardingListeningExecutorService;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * This ExecutorService blocks the submission of new tasks when its queue is
- * already full by using a semaphore. Task submissions require permits, task
- * completions release permits.
- * <p>
- * This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code
- * contains the thread pool logic, whereas this isolates the semaphore
- * and submit logic for use with other thread pools and delegation models.
- * In particular, it <i>permits multiple per stream executors to share a
- * single per-FS-instance executor; the latter to throttle overall
- * load from the the FS, the others to limit the amount of load which
- * a single output stream can generate.</i>
- * <p>
- * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
- * this s4 threadpool</a>
- */
-@SuppressWarnings("NullableProblems")
-@InterfaceAudience.Private
-class SemaphoredDelegatingExecutor extends
- ForwardingListeningExecutorService {
-
- private final Semaphore queueingPermits;
- private final ListeningExecutorService executorDelegatee;
- private final int permitCount;
-
- /**
- * Instantiate.
- * @param executorDelegatee Executor to delegate to
- * @param permitCount number of permits into the queue permitted
- * @param fair should the semaphore be "fair"
- */
- SemaphoredDelegatingExecutor(ListeningExecutorService executorDelegatee,
- int permitCount,
- boolean fair) {
- this.permitCount = permitCount;
- queueingPermits = new Semaphore(permitCount, fair);
- this.executorDelegatee = executorDelegatee;
- }
-
- @Override
- protected ListeningExecutorService delegate() {
- return executorDelegatee;
- }
-
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
- throws InterruptedException {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
- long timeout, TimeUnit unit) throws InterruptedException {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
- throws InterruptedException, ExecutionException {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
- TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public <T> ListenableFuture<T> submit(Callable<T> task) {
- try {
- queueingPermits.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return Futures.immediateFailedCheckedFuture(e);
- }
- return super.submit(new CallableWithPermitRelease<>(task));
- }
-
- @Override
- public <T> ListenableFuture<T> submit(Runnable task, T result) {
- try {
- queueingPermits.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return Futures.immediateFailedCheckedFuture(e);
- }
- return super.submit(new RunnableWithPermitRelease(task), result);
- }
-
- @Override
- public ListenableFuture<?> submit(Runnable task) {
- try {
- queueingPermits.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return Futures.immediateFailedCheckedFuture(e);
- }
- return super.submit(new RunnableWithPermitRelease(task));
- }
-
- @Override
- public void execute(Runnable command) {
- try {
- queueingPermits.acquire();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- super.execute(new RunnableWithPermitRelease(command));
- }
-
- /**
- * Get the number of permits available; guaranteed to be
- * {@code 0 <= availablePermits <= size}.
- * @return the number of permits available at the time of invocation.
- */
- public int getAvailablePermits() {
- return queueingPermits.availablePermits();
- }
-
- /**
- * Get the number of threads waiting to acquire a permit.
- * @return snapshot of the length of the queue of blocked threads.
- */
- public int getWaitingCount() {
- return queueingPermits.getQueueLength();
- }
-
- /**
- * Total number of permits.
- * @return the number of permits as set in the constructor
- */
- public int getPermitCount() {
- return permitCount;
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder(
- "SemaphoredDelegatingExecutor{");
- sb.append("permitCount=").append(getPermitCount());
- sb.append(", available=").append(getAvailablePermits());
- sb.append(", waiting=").append(getWaitingCount());
- sb.append('}');
- return sb.toString();
- }
-
- /**
- * Releases a permit after the task is executed.
- */
- class RunnableWithPermitRelease implements Runnable {
-
- private Runnable delegatee;
-
- public RunnableWithPermitRelease(Runnable delegatee) {
- this.delegatee = delegatee;
- }
-
- @Override
- public void run() {
- try {
- delegatee.run();
- } finally {
- queueingPermits.release();
- }
-
- }
- }
-
- /**
- * Releases a permit after the task is completed.
- */
- class CallableWithPermitRelease<T> implements Callable<T> {
-
- private Callable<T> delegatee;
-
- public CallableWithPermitRelease(Callable<T> delegatee) {
- this.delegatee = delegatee;
- }
-
- @Override
- public T call() throws Exception {
- try {
- return delegatee.call();
- } finally {
- queueingPermits.release();
- }
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9875b04/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
index b1b8240..3dfe286 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.fs.s3a;
import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
+import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.StopWatch;
import org.junit.AfterClass;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org