You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2022/04/04 14:44:39 UTC

[hadoop] branch feature-HADOOP-18028-s3a-prefetch updated: HADOOP-18180. Replace use of twitter util-core with java futures in S3A prefetching stream (#4115)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch feature-HADOOP-18028-s3a-prefetch
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/feature-HADOOP-18028-s3a-prefetch by this push:
     new 3aa03e0eb95 HADOOP-18180. Replace use of twitter util-core with java futures in S3A prefetching stream (#4115)
3aa03e0eb95 is described below

commit 3aa03e0eb95bbcb066144706e06509f0e0549196
Author: PJ Fanning <pj...@users.noreply.github.com>
AuthorDate: Mon Apr 4 16:44:23 2022 +0200

    HADOOP-18180. Replace use of twitter util-core with java futures in S3A prefetching stream (#4115)
    
    
    Contributed by PJ Fanning.
---
 hadoop-tools/hadoop-aws/pom.xml                    |  6 --
 .../org/apache/hadoop/fs/common/BufferData.java    |  7 +-
 .../org/apache/hadoop/fs/common/BufferPool.java    |  5 +-
 .../hadoop/fs/common/CachingBlockManager.java      | 34 ++++----
 .../fs/common/ExecutorServiceFuturePool.java       | 70 ++++++++++++++++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  5 +-
 .../org/apache/hadoop/fs/s3a/S3AReadOpContext.java | 15 ++--
 .../hadoop/fs/s3a/read/S3CachingBlockManager.java  |  4 +-
 .../hadoop/fs/s3a/read/S3CachingInputStream.java   |  4 +-
 .../apache/hadoop/fs/common/TestBufferData.java    | 10 ++-
 .../fs/common/TestExecutorServiceFuturePool.java   | 92 ++++++++++++++++++++++
 .../java/org/apache/hadoop/fs/s3a/read/Fakes.java  | 14 ++--
 .../fs/s3a/read/TestS3CachingBlockManager.java     |  7 +-
 .../org/apache/hadoop/fs/s3a/read/TestS3File.java  |  5 +-
 .../hadoop/fs/s3a/read/TestS3InputStream.java      |  5 +-
 15 files changed, 217 insertions(+), 66 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 2e6b8ff3595..5583bb7ad05 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -435,12 +435,6 @@
       <artifactId>aws-java-sdk-bundle</artifactId>
       <scope>compile</scope>
     </dependency>
-    <dependency>
-      <groupId>com.twitter</groupId>
-      <artifactId>util-core_2.11</artifactId>
-      <version>21.2.0</version>
-      <scope>compile</scope>
-    </dependency>
     <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java
index 34dd6d7ba3b..a855a1c2c39 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferData.java
@@ -22,10 +22,9 @@ package org.apache.hadoop.fs.common;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Future;
 import java.util.zip.CRC32;
 
-import com.twitter.util.Awaitable.CanAwait;
-import com.twitter.util.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -263,8 +262,6 @@ public class BufferData {
     return false;
   }
 
-  private static final CanAwait CAN_AWAIT = () -> false;
-
   public String toString() {
 
     return String.format(
@@ -281,7 +278,7 @@ public class BufferData {
     if (f == null) {
       return "--";
     } else {
-      return this.action.isReady(CAN_AWAIT) ? "done" : "not done";
+      return this.action.isDone() ? "done" : "not done";
     }
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java
index 91798e55006..b151ed439af 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/BufferPool.java
@@ -26,9 +26,8 @@ import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CancellationException;
+import java.util.concurrent.Future;
 
-import com.twitter.util.Future;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -233,7 +232,7 @@ public class BufferPool implements Closeable {
     for (BufferData data : this.getAll()) {
       Future<Void> actionFuture = data.getActionFuture();
       if (actionFuture != null) {
-        actionFuture.raise(new CancellationException("BufferPool is closing."));
+        actionFuture.cancel(true);
       }
     }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java
index 93417f3fe61..44c2df29138 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java
@@ -21,13 +21,13 @@ package org.apache.hadoop.fs.common;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
-import com.twitter.util.Await;
-import com.twitter.util.ExceptionalFunction0;
-import com.twitter.util.Future;
-import com.twitter.util.FuturePool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,9 +37,10 @@ import org.slf4j.LoggerFactory;
  */
 public abstract class CachingBlockManager extends BlockManager {
   private static final Logger LOG = LoggerFactory.getLogger(CachingBlockManager.class);
+  private static final int TIMEOUT_MINUTES = 60;
 
   // Asynchronous tasks are performed in this pool.
-  private final FuturePool futurePool;
+  private final ExecutorServiceFuturePool futurePool;
 
   // Pool of shared ByteBuffer instances.
   private BufferPool bufferPool;
@@ -78,7 +79,7 @@ public abstract class CachingBlockManager extends BlockManager {
    * @throws IllegalArgumentException if bufferPoolSize is zero or negative.
    */
   public CachingBlockManager(
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       BlockData blockData,
       int bufferPoolSize) {
     super(blockData);
@@ -247,7 +248,7 @@ public abstract class CachingBlockManager extends BlockManager {
 
       BlockOperations.Operation op = this.ops.requestPrefetch(blockNumber);
       PrefetchTask prefetchTask = new PrefetchTask(data, this);
-      Future<Void> prefetchFuture = this.futurePool.apply(prefetchTask);
+      Future<Void> prefetchFuture = this.futurePool.executeFunction(prefetchTask);
       data.setPrefetch(prefetchFuture);
       this.ops.end(op);
     }
@@ -344,7 +345,7 @@ public abstract class CachingBlockManager extends BlockManager {
   /**
    * Read task that is submitted to the future pool.
    */
-  private static class PrefetchTask extends ExceptionalFunction0<Void> {
+  private static class PrefetchTask implements Supplier<Void> {
     private final BufferData data;
     private final CachingBlockManager blockManager;
 
@@ -354,7 +355,7 @@ public abstract class CachingBlockManager extends BlockManager {
     }
 
     @Override
-    public Void applyE() {
+    public Void get() {
       try {
         this.blockManager.prefetch(data);
       } catch (Exception e) {
@@ -412,11 +413,13 @@ public abstract class CachingBlockManager extends BlockManager {
       if (state == BufferData.State.PREFETCHING) {
         blockFuture = data.getActionFuture();
       } else {
-        blockFuture = Future.value(null);
+        CompletableFuture<Void> cf = new CompletableFuture<>();
+        cf.complete(null);
+        blockFuture = cf;
       }
 
       CachePutTask task = new CachePutTask(data, blockFuture, this);
-      Future<Void> actionFuture = this.futurePool.apply(task);
+      Future<Void> actionFuture = this.futurePool.executeFunction(task);
       data.setCaching(actionFuture);
       this.ops.end(op);
     }
@@ -433,14 +436,13 @@ public abstract class CachingBlockManager extends BlockManager {
     }
 
     try {
-      Await.result(blockFuture);
+      blockFuture.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
       if (data.stateEqualsOneOf(BufferData.State.DONE)) {
         // There was an error during prefetch.
         return;
       }
     } catch (Exception e) {
-      String message = String.format("error waitng on blockFuture: %s", data);
-      LOG.error(message, e);
+      LOG.error("error waiting on blockFuture: {}", data, e);
       data.setDone();
       return;
     }
@@ -500,7 +502,7 @@ public abstract class CachingBlockManager extends BlockManager {
     this.cache.put(blockNumber, buffer);
   }
 
-  private static class CachePutTask extends ExceptionalFunction0<Void> {
+  private static class CachePutTask implements Supplier<Void> {
     private final BufferData data;
 
     // Block being asynchronously fetched.
@@ -519,7 +521,7 @@ public abstract class CachingBlockManager extends BlockManager {
     }
 
     @Override
-    public Void applyE() {
+    public Void get() {
       this.blockManager.addToCacheAndRelease(this.data, this.blockFuture);
       return null;
     }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java
new file mode 100644
index 00000000000..932a047c940
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/ExecutorServiceFuturePool.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.util.Locale;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.function.Supplier;
+
+/**
+ * A FuturePool implementation backed by a java.util.concurrent.ExecutorService.
+ *
+ * If a piece of work has started, it cannot (currently) be cancelled.
+ *
+ * This class is a simplified version of <code>com.twitter:util-core_2.11</code>
+ * ExecutorServiceFuturePool designed to avoid depending on that Scala library.
+ * One problem with using a Scala library is that many downstream projects
+ * (eg Apache Spark) use Scala, and they might want to use a different version of Scala
+ * from the version that Hadoop chooses to use.
+ *
+ */
+public class ExecutorServiceFuturePool {
+  private ExecutorService executor;
+
+  public ExecutorServiceFuturePool(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * @param f function to run in future on executor pool
+   * @return future
+   * @throws java.util.concurrent.RejectedExecutionException can be thrown
+   * @throws NullPointerException if f param is null
+   */
+  public Future<Void> executeFunction(final Supplier<Void> f) {
+    return executor.submit(f::get);
+  }
+
+  /**
+   * @param r runnable to run in future on executor pool
+   * @return future
+   * @throws java.util.concurrent.RejectedExecutionException can be thrown
+   * @throws NullPointerException if r param is null
+   */
+  @SuppressWarnings("unchecked")
+  public Future<Void> executeRunnable(final Runnable r) {
+    return (Future<Void>) executor.submit(r::run);
+  }
+
+  public String toString() {
+    return String.format(Locale.ROOT, "ExecutorServiceFuturePool(executor=%s)", executor);
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 35c26067fcc..76fe678073d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -76,8 +76,7 @@ import com.amazonaws.services.s3.transfer.model.CopyResult;
 import com.amazonaws.services.s3.transfer.model.UploadResult;
 import com.amazonaws.event.ProgressListener;
 
-import com.twitter.util.ExecutorServiceFuturePool;
-import com.twitter.util.FuturePool;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -283,7 +282,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   private ThreadPoolExecutor unboundedThreadPool;
 
   // S3 reads are prefetched asynchronously using this future pool.
-  private FuturePool futurePool;
+  private ExecutorServiceFuturePool futurePool;
 
   // If true, the prefetching input stream is used for reads.
   private boolean prefetchEnabled;
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
index acfe6a415f1..cfbd9921d80 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AReadOpContext.java
@@ -18,11 +18,10 @@
 
 package org.apache.hadoop.fs.s3a;
 
-import com.twitter.util.FuturePool;
-
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
 import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
 import org.apache.hadoop.fs.store.audit.AuditSpan;
@@ -61,7 +60,7 @@ public class S3AReadOpContext extends S3AOpContext {
   private final AuditSpan auditSpan;
 
   // S3 reads are prefetched asynchronously using this future pool.
-  private FuturePool futurePool;
+  private ExecutorServiceFuturePool futurePool;
 
   // Size in bytes of a single prefetch block.
   private final int prefetchBlockSize;
@@ -80,7 +79,7 @@ public class S3AReadOpContext extends S3AOpContext {
    * @param changeDetectionPolicy change detection policy.
    * @param readahead readahead for GET operations/skip, etc.
    * @param auditSpan active audit
-   * @param futurePool the FuturePool instance used by async prefetches.
+   * @param futurePool the ExecutorServiceFuturePool instance used by async prefetches.
    * @param prefetchBlockSize the size (in number of bytes) of each prefetched block.
    * @param prefetchBlockCount maximum number of prefetched blocks.
    */
@@ -94,7 +93,7 @@ public class S3AReadOpContext extends S3AOpContext {
       ChangeDetectionPolicy changeDetectionPolicy,
       final long readahead,
       final AuditSpan auditSpan,
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       int prefetchBlockSize,
       int prefetchBlockCount) {
 
@@ -161,11 +160,11 @@ public class S3AReadOpContext extends S3AOpContext {
   }
 
   /**
-   * Gets the {@code FuturePool} used for asynchronous prefetches.
+   * Gets the {@code ExecutorServiceFuturePool} used for asynchronous prefetches.
    *
-   * @return the {@code FuturePool} used for asynchronous prefetches.
+   * @return the {@code ExecutorServiceFuturePool} used for asynchronous prefetches.
    */
-  public FuturePool getFuturePool() {
+  public ExecutorServiceFuturePool getFuturePool() {
     return this.futurePool;
   }
 
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java
index c4fafd56f1d..674a5ccbdd8 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingBlockManager.java
@@ -22,12 +22,12 @@ package org.apache.hadoop.fs.s3a.read;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import com.twitter.util.FuturePool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.common.BlockData;
 import org.apache.hadoop.fs.common.CachingBlockManager;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.common.Validate;
 
 /**
@@ -52,7 +52,7 @@ public class S3CachingBlockManager extends CachingBlockManager {
    * @throws IllegalArgumentException if reader is null.
    */
   public S3CachingBlockManager(
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       S3Reader reader,
       BlockData blockData,
       int bufferPoolSize) {
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
index 11170025268..a1a9a22448a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3CachingInputStream.java
@@ -21,13 +21,13 @@ package org.apache.hadoop.fs.s3a.read;
 
 import java.io.IOException;
 
-import com.twitter.util.FuturePool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.fs.common.BlockData;
 import org.apache.hadoop.fs.common.BlockManager;
 import org.apache.hadoop.fs.common.BufferData;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
@@ -186,7 +186,7 @@ public class S3CachingInputStream extends S3InputStream {
   }
 
   protected BlockManager createBlockManager(
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       S3Reader reader,
       BlockData blockData,
       int bufferPoolSize) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java
index 119e90ffeba..c4699d11540 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferData.java
@@ -24,8 +24,8 @@ import java.nio.ReadOnlyBufferException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 
-import com.twitter.util.Future;
 import org.junit.Test;
 
 import org.apache.hadoop.test.AbstractHadoopTestBase;
@@ -83,13 +83,14 @@ public class TestBufferData extends AbstractHadoopTestBase {
 
     assertEquals(BufferData.State.BLANK, data.getState());
 
-    Future<Void> actionFuture = Future.value(null);
+    CompletableFuture<Void> actionFuture = new CompletableFuture<>();
+    actionFuture.complete(null);
     data.setPrefetch(actionFuture);
     assertEquals(BufferData.State.PREFETCHING, data.getState());
     assertNotNull(data.getActionFuture());
     assertSame(actionFuture, data.getActionFuture());
 
-    Future<Void> actionFuture2 = Future.value(null);
+    CompletableFuture<Void> actionFuture2 = new CompletableFuture<>();
     data.setCaching(actionFuture2);
     assertEquals(BufferData.State.CACHING, data.getState());
     assertNotNull(data.getActionFuture());
@@ -117,7 +118,8 @@ public class TestBufferData extends AbstractHadoopTestBase {
 
   @Test
   public void testInvalidStateUpdates() throws Exception {
-    Future<Void> actionFuture = Future.value(null);
+    CompletableFuture<Void> actionFuture = new CompletableFuture<>();
+    actionFuture.complete(null);
     testInvalidStateUpdatesHelper(
         (d) -> d.setPrefetch(actionFuture),
         BufferData.State.BLANK,
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java
new file mode 100644
index 00000000000..00055a9ea1e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestExecutorServiceFuturePool.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.common;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static org.junit.Assert.assertTrue;
+
+public class TestExecutorServiceFuturePool extends AbstractHadoopTestBase {
+
+  private ExecutorService executorService;
+
+  @Before
+  public void setUp() {
+    executorService = Executors.newFixedThreadPool(3);
+  }
+
+  @After
+  public void tearDown() {
+    if (executorService != null) {
+      executorService.shutdownNow();
+    }
+  }
+
+  @Test
+  public void testRunnableSucceeds() throws Exception {
+    ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService);
+    final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+    Future<Void> future = futurePool.executeRunnable(() -> atomicBoolean.set(true));
+    future.get(30, TimeUnit.SECONDS);
+    assertTrue("atomicBoolean set to true?", atomicBoolean.get());
+  }
+
+  @Test
+  public void testSupplierSucceeds() throws Exception {
+    ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService);
+    final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+    Future<Void> future = futurePool.executeFunction(() -> {
+      atomicBoolean.set(true);
+      return null;
+    });
+    future.get(30, TimeUnit.SECONDS);
+    assertTrue("atomicBoolean set to true?", atomicBoolean.get());
+  }
+
+  @Test
+  public void testRunnableFails() throws Exception {
+    ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService);
+    Future<Void> future = futurePool.executeRunnable(() -> {
+      throw new IllegalStateException("deliberate");
+    });
+    LambdaTestUtils.intercept(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testSupplierFails() throws Exception {
+    ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(executorService);
+    Future<Void> future = futurePool.executeFunction(() -> {
+      throw new IllegalStateException("deliberate");
+    });
+    LambdaTestUtils.intercept(ExecutionException.class, () -> future.get(30, TimeUnit.SECONDS));
+  }
+}
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java
index 5c2f7eb2241..1b02c495bc4 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/Fakes.java
@@ -34,11 +34,11 @@ import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
-import com.twitter.util.FuturePool;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.common.BlockCache;
 import org.apache.hadoop.fs.common.BlockData;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.common.SingleFilePerBlockCache;
 import org.apache.hadoop.fs.common.Validate;
 import org.apache.hadoop.fs.s3a.Invoker;
@@ -109,7 +109,7 @@ public final class Fakes {
   }
 
   public static S3AReadOpContext createReadContext(
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       String key,
       int fileSize,
       int prefetchBlockSize,
@@ -195,7 +195,7 @@ public final class Fakes {
 
   public static S3InputStream createInputStream(
       Class<? extends S3InputStream> clazz,
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       String bucket,
       String key,
       int fileSize,
@@ -225,7 +225,7 @@ public final class Fakes {
   }
 
   public static TestS3InMemoryInputStream createS3InMemoryInputStream(
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       String bucket,
       String key,
       int fileSize) {
@@ -235,7 +235,7 @@ public final class Fakes {
   }
 
   public static TestS3CachingInputStream createS3CachingInputStream(
-      FuturePool futurePool,
+      ExecutorServiceFuturePool futurePool,
       String bucket,
       String key,
       int fileSize,
@@ -322,7 +322,7 @@ public final class Fakes {
 
   public static class TestS3CachingBlockManager extends S3CachingBlockManager {
     public TestS3CachingBlockManager(
-        FuturePool futurePool,
+        ExecutorServiceFuturePool futurePool,
         S3Reader reader,
         BlockData blockData,
         int bufferPoolSize) {
@@ -359,7 +359,7 @@ public final class Fakes {
 
     @Override
     protected S3CachingBlockManager createBlockManager(
-        FuturePool futurePool,
+        ExecutorServiceFuturePool futurePool,
         S3Reader reader,
         BlockData blockData,
         int bufferPoolSize) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java
index 99836793dec..3f84e2e0283 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3CachingBlockManager.java
@@ -24,13 +24,12 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import com.twitter.util.ExecutorServiceFuturePool;
-import com.twitter.util.FuturePool;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.common.BlockData;
 import org.apache.hadoop.fs.common.BufferData;
 import org.apache.hadoop.fs.common.ExceptionAsserts;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 
 import static org.junit.Assert.assertEquals;
@@ -41,7 +40,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
   static final int POOL_SIZE = 3;
 
   private final ExecutorService threadPool = Executors.newFixedThreadPool(4);
-  private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
+  private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
 
   private final BlockData blockData = new BlockData(FILE_SIZE, BLOCK_SIZE);
 
@@ -106,7 +105,7 @@ public class TestS3CachingBlockManager extends AbstractHadoopTestBase {
    */
   static class TestBlockManager extends S3CachingBlockManager {
     TestBlockManager(
-        FuturePool futurePool,
+        ExecutorServiceFuturePool futurePool,
         S3Reader reader,
         BlockData blockData,
         int bufferPoolSize) {
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java
index 2f555d2b62c..9f63ea0a889 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3File.java
@@ -22,11 +22,10 @@ package org.apache.hadoop.fs.s3a.read;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import com.twitter.util.ExecutorServiceFuturePool;
-import com.twitter.util.FuturePool;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.common.ExceptionAsserts;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
@@ -36,7 +35,7 @@ import org.apache.hadoop.test.AbstractHadoopTestBase;
 
 public class TestS3File extends AbstractHadoopTestBase {
   private final ExecutorService threadPool = Executors.newFixedThreadPool(1);
-  private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
+  private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
   private final S3AInputStream.InputStreamCallbacks client = MockS3File.createClient("bucket");
 
   @Test
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java
index 503cd699002..010bc1c30b6 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java
@@ -24,12 +24,11 @@ import java.io.IOException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
-import com.twitter.util.ExecutorServiceFuturePool;
-import com.twitter.util.FuturePool;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.common.ExceptionAsserts;
+import org.apache.hadoop.fs.common.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
@@ -45,7 +44,7 @@ public class TestS3InputStream extends AbstractHadoopTestBase {
   private static final int FILE_SIZE = 10;
 
   private final ExecutorService threadPool = Executors.newFixedThreadPool(4);
-  private final FuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
+  private final ExecutorServiceFuturePool futurePool = new ExecutorServiceFuturePool(threadPool);
   private final S3AInputStream.InputStreamCallbacks client = MockS3File.createClient("bucket");
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org