You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2018/01/28 05:00:48 UTC

[3/3] hadoop git commit: HADOOP-15039/HADOOP-15189. Move SemaphoredDelegatingExecutor to hadoop-common Contributed by Genmao Yu

HADOOP-15039/HADOOP-15189. Move SemaphoredDelegatingExecutor to hadoop-common
Contributed by Genmao Yu

(cherry picked from commit 312c5716943b31f0f6363f6df02523f959eb981e)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d9875b04
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d9875b04
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d9875b04

Branch: refs/heads/branch-2.9
Commit: d9875b046fe0371fd6a3d54e285d216447c59a08
Parents: d69df0b
Author: Steve Loughran <st...@apache.org>
Authored: Sat Jan 27 21:00:25 2018 -0800
Committer: Steve Loughran <st...@apache.org>
Committed: Sat Jan 27 21:00:25 2018 -0800

----------------------------------------------------------------------
 .../util/BlockingThreadPoolExecutorService.java | 170 ++++++++++++++
 .../util/SemaphoredDelegatingExecutor.java      | 227 ++++++++++++++++++
 .../s3a/BlockingThreadPoolExecutorService.java  | 170 --------------
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |   2 +
 .../fs/s3a/SemaphoredDelegatingExecutor.java    | 230 -------------------
 .../ITestBlockingThreadPoolExecutorService.java |   2 +
 6 files changed, 401 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9875b04/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java
new file mode 100644
index 0000000..79128de
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/BlockingThreadPoolExecutorService.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.MoreExecutors;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * This ExecutorService blocks the submission of new tasks when its queue is
+ * already full by using a semaphore. Task submissions require permits, task
+ * completions release permits.
+ * <p>
+ * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
+ * this s4 threadpool</a>
+ */
+@InterfaceAudience.Private
+public final class BlockingThreadPoolExecutorService
+    extends SemaphoredDelegatingExecutor {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(BlockingThreadPoolExecutorService.class);
+
+  private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);
+
+  private final ThreadPoolExecutor eventProcessingExecutor;
+
+  /**
+   * Returns a {@link java.util.concurrent.ThreadFactory} that names each
+   * created thread uniquely,
+   * with a common prefix.
+   *
+   * @param prefix The prefix of every created Thread's name
+   * @return a {@link java.util.concurrent.ThreadFactory} that names threads
+   */
+  static ThreadFactory getNamedThreadFactory(final String prefix) {
+    SecurityManager s = System.getSecurityManager();
+    final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() :
+        Thread.currentThread().getThreadGroup();
+
+    return new ThreadFactory() {
+      private final AtomicInteger threadNumber = new AtomicInteger(1);
+      private final int poolNum = POOLNUMBER.getAndIncrement();
+      private final ThreadGroup group = threadGroup;
+
+      @Override
+      public Thread newThread(Runnable r) {
+        final String name =
+            prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
+        return new Thread(group, r, name);
+      }
+    };
+  }
+
+  /**
+   * Get a named {@link ThreadFactory} that just builds daemon threads.
+   *
+   * @param prefix name prefix for all threads created from the factory
+   * @return a thread factory that creates named, daemon threads with
+   * the supplied exception handler and normal priority
+   */
+  public static ThreadFactory newDaemonThreadFactory(final String prefix) {
+    final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
+    return new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = namedFactory.newThread(r);
+        if (!t.isDaemon()) {
+          t.setDaemon(true);
+        }
+        if (t.getPriority() != Thread.NORM_PRIORITY) {
+          t.setPriority(Thread.NORM_PRIORITY);
+        }
+        return t;
+      }
+
+    };
+  }
+
+  private BlockingThreadPoolExecutorService(int permitCount,
+      ThreadPoolExecutor eventProcessingExecutor) {
+    super(MoreExecutors.listeningDecorator(eventProcessingExecutor),
+        permitCount, false);
+    this.eventProcessingExecutor = eventProcessingExecutor;
+  }
+
+  /**
+   * A thread pool that that blocks clients submitting additional tasks if
+   * there are already {@code activeTasks} running threads and {@code
+   * waitingTasks} tasks waiting in its queue.
+   *
+   * @param activeTasks maximum number of active tasks
+   * @param waitingTasks maximum number of waiting tasks
+   * @param keepAliveTime time until threads are cleaned up in {@code unit}
+   * @param unit time unit
+   * @param prefixName prefix of name for threads
+   */
+  public static BlockingThreadPoolExecutorService newInstance(
+      int activeTasks,
+      int waitingTasks,
+      long keepAliveTime, TimeUnit unit,
+      String prefixName) {
+
+    /* Although we generally only expect up to waitingTasks tasks in the
+    queue, we need to be able to buffer all tasks in case dequeueing is
+    slower than enqueueing. */
+    final BlockingQueue<Runnable> workQueue =
+        new LinkedBlockingQueue<>(waitingTasks + activeTasks);
+    ThreadPoolExecutor eventProcessingExecutor =
+        new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
+            workQueue, newDaemonThreadFactory(prefixName),
+            new RejectedExecutionHandler() {
+              @Override
+              public void rejectedExecution(Runnable r,
+                  ThreadPoolExecutor executor) {
+                // This is not expected to happen.
+                LOG.error("Could not submit task to executor {}",
+                    executor.toString());
+              }
+            });
+    eventProcessingExecutor.allowCoreThreadTimeOut(true);
+    return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
+        eventProcessingExecutor);
+  }
+
+  /**
+   * Get the actual number of active threads.
+   * @return the active thread count
+   */
+  int getActiveCount() {
+    return eventProcessingExecutor.getActiveCount();
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "BlockingThreadPoolExecutorService{");
+    sb.append(super.toString());
+    sb.append(", activeCount=").append(getActiveCount());
+    sb.append('}');
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9875b04/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java
new file mode 100644
index 0000000..7a1e33a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SemaphoredDelegatingExecutor.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This ExecutorService blocks the submission of new tasks when its queue is
+ * already full by using a semaphore. Task submissions require permits, task
+ * completions release permits.
+ * <p>
+ * This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code
+ * contains the thread pool logic, whereas this isolates the semaphore
+ * and submit logic for use with other thread pools and delegation models.
+ * <p>
+ * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
+ * this s4 threadpool</a>
+ */
+@SuppressWarnings("NullableProblems")
+@InterfaceAudience.Private
+public class SemaphoredDelegatingExecutor extends
+    ForwardingListeningExecutorService {
+
+  private final Semaphore queueingPermits;
+  private final ListeningExecutorService executorDelegatee;
+  private final int permitCount;
+
+  /**
+   * Instantiate.
+   * @param executorDelegatee Executor to delegate to
+   * @param permitCount number of permits into the queue permitted
+   * @param fair should the semaphore be "fair"
+   */
+  public SemaphoredDelegatingExecutor(
+      ListeningExecutorService executorDelegatee,
+      int permitCount,
+      boolean fair) {
+    this.permitCount = permitCount;
+    queueingPermits = new Semaphore(permitCount, fair);
+    this.executorDelegatee = executorDelegatee;
+  }
+
+  @Override
+  protected ListeningExecutorService delegate() {
+    return executorDelegatee;
+  }
+
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+      long timeout, TimeUnit unit) throws InterruptedException {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException, ExecutionException {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
+      TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public <T> ListenableFuture<T> submit(Callable<T> task) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return Futures.immediateFailedCheckedFuture(e);
+    }
+    return super.submit(new CallableWithPermitRelease<>(task));
+  }
+
+  @Override
+  public <T> ListenableFuture<T> submit(Runnable task, T result) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return Futures.immediateFailedCheckedFuture(e);
+    }
+    return super.submit(new RunnableWithPermitRelease(task), result);
+  }
+
+  @Override
+  public ListenableFuture<?> submit(Runnable task) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return Futures.immediateFailedCheckedFuture(e);
+    }
+    return super.submit(new RunnableWithPermitRelease(task));
+  }
+
+  @Override
+  public void execute(Runnable command) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    super.execute(new RunnableWithPermitRelease(command));
+  }
+
+  /**
+   * Get the number of permits available; guaranteed to be
+   * {@code 0 <= availablePermits <= size}.
+   * @return the number of permits available at the time of invocation.
+   */
+  public int getAvailablePermits() {
+    return queueingPermits.availablePermits();
+  }
+
+  /**
+   * Get the number of threads waiting to acquire a permit.
+   * @return snapshot of the length of the queue of blocked threads.
+   */
+  public int getWaitingCount() {
+    return queueingPermits.getQueueLength();
+  }
+
+  /**
+   * Total number of permits.
+   * @return the number of permits as set in the constructor
+   */
+  public int getPermitCount() {
+    return permitCount;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "SemaphoredDelegatingExecutor{");
+    sb.append("permitCount=").append(getPermitCount());
+    sb.append(", available=").append(getAvailablePermits());
+    sb.append(", waiting=").append(getWaitingCount());
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Releases a permit after the task is executed.
+   */
+  class RunnableWithPermitRelease implements Runnable {
+
+    private Runnable delegatee;
+
+    RunnableWithPermitRelease(Runnable delegatee) {
+      this.delegatee = delegatee;
+    }
+
+    @Override
+    public void run() {
+      try {
+        delegatee.run();
+      } finally {
+        queueingPermits.release();
+      }
+
+    }
+  }
+
+  /**
+   * Releases a permit after the task is completed.
+   */
+  class CallableWithPermitRelease<T> implements Callable<T> {
+
+    private Callable<T> delegatee;
+
+    CallableWithPermitRelease(Callable<T> delegatee) {
+      this.delegatee = delegatee;
+    }
+
+    @Override
+    public T call() throws Exception {
+      try {
+        return delegatee.call();
+      } finally {
+        queueingPermits.release();
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9875b04/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
deleted file mode 100644
index f13942d..0000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.util.concurrent.MoreExecutors;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-/**
- * This ExecutorService blocks the submission of new tasks when its queue is
- * already full by using a semaphore. Task submissions require permits, task
- * completions release permits.
- * <p>
- * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
- * this s4 threadpool</a>
- */
-@InterfaceAudience.Private
-final class BlockingThreadPoolExecutorService
-    extends SemaphoredDelegatingExecutor {
-
-  private static final Logger LOG = LoggerFactory
-      .getLogger(BlockingThreadPoolExecutorService.class);
-
-  private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);
-
-  private final ThreadPoolExecutor eventProcessingExecutor;
-
-  /**
-   * Returns a {@link java.util.concurrent.ThreadFactory} that names each
-   * created thread uniquely,
-   * with a common prefix.
-   *
-   * @param prefix The prefix of every created Thread's name
-   * @return a {@link java.util.concurrent.ThreadFactory} that names threads
-   */
-  static ThreadFactory getNamedThreadFactory(final String prefix) {
-    SecurityManager s = System.getSecurityManager();
-    final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() :
-        Thread.currentThread().getThreadGroup();
-
-    return new ThreadFactory() {
-      private final AtomicInteger threadNumber = new AtomicInteger(1);
-      private final int poolNum = POOLNUMBER.getAndIncrement();
-      private final ThreadGroup group = threadGroup;
-
-      @Override
-      public Thread newThread(Runnable r) {
-        final String name =
-            prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
-        return new Thread(group, r, name);
-      }
-    };
-  }
-
-  /**
-   * Get a named {@link ThreadFactory} that just builds daemon threads.
-   *
-   * @param prefix name prefix for all threads created from the factory
-   * @return a thread factory that creates named, daemon threads with
-   * the supplied exception handler and normal priority
-   */
-  static ThreadFactory newDaemonThreadFactory(final String prefix) {
-    final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
-    return new ThreadFactory() {
-      @Override
-      public Thread newThread(Runnable r) {
-        Thread t = namedFactory.newThread(r);
-        if (!t.isDaemon()) {
-          t.setDaemon(true);
-        }
-        if (t.getPriority() != Thread.NORM_PRIORITY) {
-          t.setPriority(Thread.NORM_PRIORITY);
-        }
-        return t;
-      }
-
-    };
-  }
-
-  private BlockingThreadPoolExecutorService(int permitCount,
-      ThreadPoolExecutor eventProcessingExecutor) {
-    super(MoreExecutors.listeningDecorator(eventProcessingExecutor),
-        permitCount, false);
-    this.eventProcessingExecutor = eventProcessingExecutor;
-  }
-
-  /**
-   * A thread pool that that blocks clients submitting additional tasks if
-   * there are already {@code activeTasks} running threads and {@code
-   * waitingTasks} tasks waiting in its queue.
-   *
-   * @param activeTasks maximum number of active tasks
-   * @param waitingTasks maximum number of waiting tasks
-   * @param keepAliveTime time until threads are cleaned up in {@code unit}
-   * @param unit time unit
-   * @param prefixName prefix of name for threads
-   */
-  public static BlockingThreadPoolExecutorService newInstance(
-      int activeTasks,
-      int waitingTasks,
-      long keepAliveTime, TimeUnit unit,
-      String prefixName) {
-
-    /* Although we generally only expect up to waitingTasks tasks in the
-    queue, we need to be able to buffer all tasks in case dequeueing is
-    slower than enqueueing. */
-    final BlockingQueue<Runnable> workQueue =
-        new LinkedBlockingQueue<>(waitingTasks + activeTasks);
-    ThreadPoolExecutor eventProcessingExecutor =
-        new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
-            workQueue, newDaemonThreadFactory(prefixName),
-            new RejectedExecutionHandler() {
-              @Override
-              public void rejectedExecution(Runnable r,
-                  ThreadPoolExecutor executor) {
-                // This is not expected to happen.
-                LOG.error("Could not submit task to executor {}",
-                    executor.toString());
-              }
-            });
-    eventProcessingExecutor.allowCoreThreadTimeOut(true);
-    return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
-        eventProcessingExecutor);
-  }
-
-  /**
-   * Get the actual number of active threads.
-   * @return the active thread count
-   */
-  int getActiveCount() {
-    return eventProcessingExecutor.getActiveCount();
-  }
-
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder(
-        "BlockingThreadPoolExecutorService{");
-    sb.append(super.toString());
-    sb.append(", activeCount=").append(getActiveCount());
-    sb.append('}');
-    return sb.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9875b04/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 6d38fe6..f7068c4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -103,8 +103,10 @@ import org.apache.hadoop.fs.s3a.s3guard.PathMetadata;
 import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.Listing.ACCEPT_ALL;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9875b04/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
deleted file mode 100644
index 6b21912..0000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/SemaphoredDelegatingExecutor.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import com.google.common.util.concurrent.ForwardingListeningExecutorService;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * This ExecutorService blocks the submission of new tasks when its queue is
- * already full by using a semaphore. Task submissions require permits, task
- * completions release permits.
- * <p>
- * This is a refactoring of {@link BlockingThreadPoolExecutorService}; that code
- * contains the thread pool logic, whereas this isolates the semaphore
- * and submit logic for use with other thread pools and delegation models.
- * In particular, it <i>permits multiple per stream executors to share a
- * single per-FS-instance executor; the latter to throttle overall
- * load from the the FS, the others to limit the amount of load which
- * a single output stream can generate.</i>
- * <p>
- * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
- * this s4 threadpool</a>
- */
-@SuppressWarnings("NullableProblems")
-@InterfaceAudience.Private
-class SemaphoredDelegatingExecutor extends
-    ForwardingListeningExecutorService {
-
-  private final Semaphore queueingPermits;
-  private final ListeningExecutorService executorDelegatee;
-  private final int permitCount;
-
-  /**
-   * Instantiate.
-   * @param executorDelegatee Executor to delegate to
-   * @param permitCount number of permits into the queue permitted
-   * @param fair should the semaphore be "fair"
-   */
-  SemaphoredDelegatingExecutor(ListeningExecutorService executorDelegatee,
-      int permitCount,
-      boolean fair) {
-    this.permitCount = permitCount;
-    queueingPermits = new Semaphore(permitCount, fair);
-    this.executorDelegatee = executorDelegatee;
-  }
-
-  @Override
-  protected ListeningExecutorService delegate() {
-    return executorDelegatee;
-  }
-
-
-  @Override
-  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
-      throws InterruptedException {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
-      long timeout, TimeUnit unit) throws InterruptedException {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
-      throws InterruptedException, ExecutionException {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
-      TimeUnit unit)
-      throws InterruptedException, ExecutionException, TimeoutException {
-    throw new RuntimeException("Not implemented");
-  }
-
-  @Override
-  public <T> ListenableFuture<T> submit(Callable<T> task) {
-    try {
-      queueingPermits.acquire();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      return Futures.immediateFailedCheckedFuture(e);
-    }
-    return super.submit(new CallableWithPermitRelease<>(task));
-  }
-
-  @Override
-  public <T> ListenableFuture<T> submit(Runnable task, T result) {
-    try {
-      queueingPermits.acquire();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      return Futures.immediateFailedCheckedFuture(e);
-    }
-    return super.submit(new RunnableWithPermitRelease(task), result);
-  }
-
-  @Override
-  public ListenableFuture<?> submit(Runnable task) {
-    try {
-      queueingPermits.acquire();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      return Futures.immediateFailedCheckedFuture(e);
-    }
-    return super.submit(new RunnableWithPermitRelease(task));
-  }
-
-  @Override
-  public void execute(Runnable command) {
-    try {
-      queueingPermits.acquire();
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
-    super.execute(new RunnableWithPermitRelease(command));
-  }
-
-  /**
-   * Get the number of permits available; guaranteed to be
-   * {@code 0 <= availablePermits <= size}.
-   * @return the number of permits available at the time of invocation.
-   */
-  public int getAvailablePermits() {
-    return queueingPermits.availablePermits();
-  }
-
-  /**
-   * Get the number of threads waiting to acquire a permit.
-   * @return snapshot of the length of the queue of blocked threads.
-   */
-  public int getWaitingCount() {
-    return queueingPermits.getQueueLength();
-  }
-
-  /**
-   * Total number of permits.
-   * @return the number of permits as set in the constructor
-   */
-  public int getPermitCount() {
-    return permitCount;
-  }
-
-  @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder(
-        "SemaphoredDelegatingExecutor{");
-    sb.append("permitCount=").append(getPermitCount());
-    sb.append(", available=").append(getAvailablePermits());
-    sb.append(", waiting=").append(getWaitingCount());
-    sb.append('}');
-    return sb.toString();
-  }
-
-  /**
-   * Releases a permit after the task is executed.
-   */
-  class RunnableWithPermitRelease implements Runnable {
-
-    private Runnable delegatee;
-
-    public RunnableWithPermitRelease(Runnable delegatee) {
-      this.delegatee = delegatee;
-    }
-
-    @Override
-    public void run() {
-      try {
-        delegatee.run();
-      } finally {
-        queueingPermits.release();
-      }
-
-    }
-  }
-
-  /**
-   * Releases a permit after the task is completed.
-   */
-  class CallableWithPermitRelease<T> implements Callable<T> {
-
-    private Callable<T> delegatee;
-
-    public CallableWithPermitRelease(Callable<T> delegatee) {
-      this.delegatee = delegatee;
-    }
-
-    @Override
-    public T call() throws Exception {
-      try {
-        return delegatee.call();
-      } finally {
-        queueingPermits.release();
-      }
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d9875b04/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
index b1b8240..3dfe286 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.fs.s3a;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
+import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
 import org.apache.hadoop.util.StopWatch;
 
 import org.junit.AfterClass;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org