You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2016/10/20 19:49:11 UTC

hadoop git commit: HADOOP-13716. Add LambdaTestUtils class for tests; fix eventual consistency problem in contract test setup. Contributed by Steve Loughran.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 6f4192d77 -> 42f8a1d6e


HADOOP-13716. Add LambdaTestUtils class for tests; fix eventual consistency problem in contract test setup. Contributed by Steve Loughran.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/42f8a1d6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/42f8a1d6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/42f8a1d6

Branch: refs/heads/branch-2
Commit: 42f8a1d6eb5add3180094b126c4af2b3ddbc7cae
Parents: 6f4192d
Author: Anu Engineer <ae...@apache.org>
Authored: Thu Oct 20 12:47:57 2016 -0700
Committer: Anu Engineer <ae...@apache.org>
Committed: Thu Oct 20 12:47:57 2016 -0700

----------------------------------------------------------------------
 .../AbstractContractRootDirectoryTest.java      |  48 ++-
 .../hadoop/fs/contract/ContractTestUtils.java   |   6 +-
 .../org/apache/hadoop/test/LambdaTestUtils.java | 428 +++++++++++++++++++
 .../apache/hadoop/test/TestLambdaTestUtils.java | 240 +++++++++++
 .../hadoop/fs/s3a/ITestS3AFailureHandling.java  |   8 +-
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java  |  47 --
 6 files changed, 707 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/42f8a1d6/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
index 0a8f464..7295f98 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java
@@ -27,12 +27,16 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.test.LambdaTestUtils;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.toList;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
@@ -45,6 +49,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
 public abstract class AbstractContractRootDirectoryTest extends AbstractFSContractTestBase {
   private static final Logger LOG =
       LoggerFactory.getLogger(AbstractContractRootDirectoryTest.class);
+  public static final int OBJECTSTORE_RETRY_TIMEOUT = 30000;
 
   @Override
   public void setup() throws Exception {
@@ -79,23 +84,34 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
     // extra sanity checks here to avoid support calls about complete loss
     // of data
     skipIfUnsupported(TEST_ROOT_TESTS_ENABLED);
-    Path root = new Path("/");
+    final Path root = new Path("/");
     assertIsDirectory(root);
-    // make sure it is clean
-    FileSystem fs = getFileSystem();
-    deleteChildren(fs, root, true);
-    FileStatus[] children = listChildren(fs, root);
-    if (children.length > 0) {
-      StringBuilder error = new StringBuilder();
-      error.append("Deletion of child entries failed, still have")
-          .append(children.length)
-          .append(System.lineSeparator());
-      for (FileStatus child : children) {
-        error.append("  ").append(child.getPath())
-            .append(System.lineSeparator());
-      }
-      fail(error.toString());
-    }
+    // make sure the directory is clean. This includes some retry logic
+    // to forgive blobstores whose listings can be out of sync with the file
+    // status;
+    final FileSystem fs = getFileSystem();
+    final AtomicInteger iterations = new AtomicInteger(0);
+    final FileStatus[] originalChildren = listChildren(fs, root);
+    LambdaTestUtils.evaluate(
+        new Callable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            FileStatus[] deleted = deleteChildren(fs, root, true);
+            FileStatus[] children = listChildren(fs, root);
+            if (children.length > 0) {
+              fail(String.format(
+                  "After %d attempts: listing after rm /* not empty"
+                      + "\n%s\n%s\n%s",
+                  iterations.incrementAndGet(),
+                  dumpStats("final", children),
+                  dumpStats("deleted", deleted),
+                  dumpStats("original", originalChildren)));
+            }
+            return null;
+          }
+        },
+        OBJECTSTORE_RETRY_TIMEOUT,
+        new LambdaTestUtils.LinearRetryInterval(50, 1000));
     // then try to delete the empty one
     boolean deleted = fs.delete(root, false);
     LOG.info("rm / of empty dir result is {}", deleted);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42f8a1d6/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index 16bfb9a..73c8f1c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -400,18 +400,18 @@ public class ContractTestUtils extends Assert {
    * @param fileSystem filesystem
    * @param path path to delete
    * @param recursive flag to indicate child entry deletion should be recursive
-   * @return the number of child entries found and deleted (not including
+   * @return the immediate child entries found and deleted (not including
    * any recursive children of those entries)
    * @throws IOException problem in the deletion process.
    */
-  public static int deleteChildren(FileSystem fileSystem,
+  public static FileStatus[] deleteChildren(FileSystem fileSystem,
       Path path,
       boolean recursive) throws IOException {
     FileStatus[] children = listChildren(fileSystem, path);
     for (FileStatus entry : children) {
       fileSystem.delete(entry.getPath(), recursive);
     }
-    return children.length;
+    return children;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42f8a1d6/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
new file mode 100644
index 0000000..51cdb77
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.test;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.util.Time;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Class containing methods and associated classes to make the most of Lambda
+ * expressions in Hadoop tests.
+ *
+ * The code has been designed from the outset to be Java-8 friendly, but still
+ * be usable in Java 7.
+ * In particular: support for waiting for a condition to be met.
+ * This is to avoid tests having hard-coded sleeps in them.
+ *
+ * The code is modelled on {@code GenericTestUtils#waitFor(Supplier, int, int)},
+ * but also lifts concepts from Scalatest's {@code awaitResult} and
+ * its notion of pluggable retry logic (simple, backoff, maybe even things
+ * with jitter: test author gets to choose).
+ * The {@code intercept} method is also all credit due Scalatest, though
+ * it's been extended to also support a string message check; useful when
+ * checking the contents of the exception.
+ */
+public final class LambdaTestUtils {
+  public static final Logger LOG = LoggerFactory.getLogger(LambdaTestUtils.class);
+
+  private LambdaTestUtils() {
+  }
+
+  /**
+   * This is the string included in the assertion text in
+   * {@link #intercept(Class, Callable)} if
+   * the closure returned a null value.
+   */
+  public static final String NULL_RESULT = "(null)";
+
+  /**
+   * Interface to implement for converting a timeout into some form
+   * of exception to raise.
+   */
+  public interface TimeoutHandler {
+
+    /**
+     * Create an exception (or throw one, if desired).
+     * @param timeoutMillis timeout which has arisen
+     * @param caught any exception which was caught; may be null
+     * @return an exception which will then be thrown
+     * @throws Exception if the handler wishes to raise an exception
+     * that way.
+     */
+    Exception evaluate(int timeoutMillis, Exception caught) throws Exception;
+  }
+
+  /**
+   * Wait for a condition to be met.
+   * @param check predicate to evaluate
+   * @param timeoutMillis timeout in milliseconds.
+   * Can be zero, in which case only one attempt is made.
+   * @param retry retry escalation logic
+   * @param failure handler invoked on failure; the returned exception
+   * will be thrown
+   * @return the number of iterations before the condition was satisfied
+   * @throws Exception returned by {@code failure} on timeout
+   * @throws FailFastException immediately if the evaluated operation raises it
+   * @throws InterruptedException if interrupted.
+   */
+  public static int eventually(Callable<Boolean> check,
+      int timeoutMillis,
+      Callable<Integer> retry,
+      TimeoutHandler failure)
+      throws Exception {
+    Preconditions.checkArgument(timeoutMillis >= 0,
+        "timeoutMillis must be > 0");
+
+    long endTime = Time.now() + timeoutMillis;
+    Exception ex = null;
+    boolean running = true;
+    int iterations = 0;
+    while (running) {
+      iterations++;
+      try {
+        if (check.call()) {
+          return iterations;
+        }
+      } catch (InterruptedException | FailFastException e) {
+        throw e;
+      } catch (Exception e) {
+        LOG.debug("eventually() iteration {}", iterations, e);
+        ex = e;
+      }
+      running = Time.now() < endTime;
+      if (running) {
+        int sleeptime = retry.call();
+        if (sleeptime >= 0) {
+          Thread.sleep(sleeptime);
+        } else {
+          running = false;
+        }
+      }
+    }
+    // timeout
+    throw failure.evaluate(timeoutMillis, ex);
+  }
+
+  /**
+   * Simplified {@code eventually()} clause; fixed interval
+   * and {@link GenerateTimeout} used to generate the timeout.
+   * @param check predicate to evaluate
+   * @param timeoutMillis timeout in milliseconds.
+   * Can be zero, in which case only one attempt is made.
+   * @param intervalMillis interval in milliseconds between checks
+   * @return the number of iterations before the condition was satisfied
+   * @throws Exception returned by {@code failure} on timeout
+   * @throws FailFastException immediately if the evaluated operation raises it
+   * @throws InterruptedException if interrupted.
+   */
+  public static int eventually(Callable<Boolean> check,
+      int timeoutMillis,
+      int intervalMillis) throws Exception {
+    return eventually(check,
+        timeoutMillis,
+        new FixedRetryInterval(intervalMillis),
+        new GenerateTimeout());
+  }
+
+  /**
+   * Await a result; exceptions are caught and, with one exception,
+   * trigger a sleep and retry. This is similar of ScalaTest's
+   * {@code Await.result()} operation, though that lacks the ability to
+   * fail fast if the inner closure has determined that a failure condition
+   * is non-recoverable.
+   * @param eval expression to evaluate
+   * @param timeoutMillis timeout in milliseconds.
+   * Can be zero, in which case only one attempt is made.
+   * @param retry retry interval generator
+   * @param <T> return type
+   * @return result of the first successful eval call
+   * @throws Exception the last exception thrown before timeout was triggered
+   * @throws FailFastException if raised -without any retry attempt.
+   * @throws InterruptedException if interrupted during the sleep operation.
+   */
+  public static <T> T evaluate(Callable<T> eval,
+      int timeoutMillis,
+      Callable<Integer> retry) throws Exception {
+    Preconditions.checkArgument(timeoutMillis >= 0,
+        "timeoutMillis must be >= 0");
+    long endTime = Time.now() + timeoutMillis;
+    Exception ex;
+    boolean running;
+    int sleeptime;
+    int iterations = 0;
+    do {
+      iterations++;
+      try {
+        return eval.call();
+      } catch (InterruptedException | FailFastException e) {
+        throw e;
+      } catch (Exception e) {
+        LOG.debug("evaluate() iteration {}", iterations, e);
+        ex = e;
+      }
+      running = Time.now() < endTime;
+      if (running && (sleeptime = retry.call()) >= 0) {
+        Thread.sleep(sleeptime);
+      }
+    } while (running);
+    // timeout. Throw the last exception raised
+    throw ex;
+  }
+
+  /**
+   * Simplified {@code evaluate()} clause; fixed interval.
+   * @param check predicate to evaluate
+   * @param timeoutMillis wait interval between check failures
+   * @param intervalMillis interval in milliseconds
+   * @return result of the first successful eval call
+   * @throws Exception the last exception thrown before timeout was triggered
+   * @throws FailFastException if raised -without any retry attempt.
+   * @throws InterruptedException if interrupted during the sleep operation.
+   */
+  public static <T> T evaluate(Callable<T> eval,
+      int timeoutMillis,
+      int intervalMillis) throws Exception {
+    return evaluate(eval,
+        timeoutMillis,
+        new FixedRetryInterval(intervalMillis));
+  }
+
+  /**
+   * Intercept an exception; raise an exception if it was not raised.
+   * Exceptions of the wrong class are also rethrown.
+   * @param clazz class of exception; the raised exception must be this class
+   * <i>or a subclass</i>.
+   * @param eval expression to eval
+   * @param <T> return type of expression
+   * @param <E> exception class
+   * @return the caught exception if it was of the expected type
+   * @throws Exception any other exception raised
+   * @throws AssertionError if the evaluation call didn't raise an exception.
+   * The error includes the {@code toString()} value of the result, if this
+   * can be determined.
+   */
+  public static <T, E extends Throwable> E intercept(
+      Class<E> clazz,
+      Callable<T> eval)
+      throws Exception {
+    try {
+      T result = eval.call();
+      throw new AssertionError("Expected an exception, got "
+          + robustToString(result));
+    } catch (Throwable e) {
+      if (clazz.isAssignableFrom(e.getClass())) {
+        return (E)e;
+      } else {
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Intercept an exception; raise an exception if it was not raised.
+   * Exceptions of the wrong class are also rethrown.
+   * @param clazz class of exception; the raised exception must be this class
+   * <i>or a subclass</i>.
+   * @param contained string which must be in the {@code toString()} value
+   * of the exception
+   * @param eval expression to eval
+   * @param <T> return type of expression
+   * @param <E> exception class
+   * @return the caught exception if it was of the expected type and contents
+   * @throws Exception any other exception raised
+   * @throws AssertionError if the evaluation call didn't raise an exception.
+   * The error includes the {@code toString()} value of the result, if this
+   * can be determined.
+   */
+  public static <T, E extends Throwable> E intercept(
+      Class<E> clazz,
+      String contained,
+      Callable<T> eval)
+      throws Exception {
+    E ex = intercept(clazz, eval);
+    GenericTestUtils.assertExceptionContains(contained, ex);
+    return ex;
+  }
+
+  /**
+   * Robust string converter for exception messages; if the {@code toString()}
+   * method throws an exception then that exception is caught and logged,
+   * then a simple string of the classname logged.
+   * This stops a toString() failure hiding underlying problems in the code.
+   * @param o object to stringify
+   * @return a string for exception messages
+   */
+  private static String robustToString(Object o) {
+    if (o == null) {
+      return NULL_RESULT;
+    } else {
+      try {
+        return o.toString();
+      } catch (Exception e) {
+        LOG.info("Exception calling toString()", e);
+        return o.getClass().toString();
+      }
+    }
+  }
+
+  /**
+   * Returns {@code TimeoutException} on a timeout. If
+   * there was a inner class passed in, includes it as the
+   * inner failure.
+   */
+  public static class GenerateTimeout implements TimeoutHandler {
+    private final String message;
+
+    public GenerateTimeout(String message) {
+      this.message = message;
+    }
+
+    public GenerateTimeout() {
+      this("timeout");
+    }
+
+    /**
+     * Evaluate by creating a new Timeout Exception.
+     * @param timeoutMillis timeout in millis
+     * @param caught optional caught exception
+     * @return TimeoutException
+     */
+    @Override
+    public Exception evaluate(int timeoutMillis, Exception caught)
+        throws Exception {
+      String s = String.format("%s: after %d millis", message,
+          timeoutMillis);
+      String caughtText = caught != null
+          ? ("; " + robustToString(caught)) : "";
+
+      return (TimeoutException) (new TimeoutException(s + caughtText)
+                                     .initCause(caught));
+    }
+  }
+
+  /**
+   * Retry at a fixed time period between calls.
+   */
+  public static class FixedRetryInterval implements Callable<Integer> {
+    private final int intervalMillis;
+    private int invocationCount = 0;
+
+    public FixedRetryInterval(int intervalMillis) {
+      Preconditions.checkArgument(intervalMillis > 0);
+      this.intervalMillis = intervalMillis;
+    }
+
+    @Override
+    public Integer call() throws Exception {
+      invocationCount++;
+      return intervalMillis;
+    }
+
+    public int getInvocationCount() {
+      return invocationCount;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          "FixedRetryInterval{");
+      sb.append("interval=").append(intervalMillis);
+      sb.append(", invocationCount=").append(invocationCount);
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+
+  /**
+   * Gradually increase the sleep time by the initial interval, until
+   * the limit set by {@code maxIntervalMillis} is reached.
+   */
+  public static class LinearRetryInterval implements Callable<Integer> {
+    private final int intervalMillis;
+    private final int maxIntervalMillis;
+    private int current;
+    private int invocationCount = 0;
+
+    public LinearRetryInterval(int intervalMillis, int maxIntervalMillis) {
+      Preconditions.checkArgument(intervalMillis > 0);
+      Preconditions.checkArgument(maxIntervalMillis > 0);
+      this.intervalMillis = intervalMillis;
+      this.current = intervalMillis;
+      this.maxIntervalMillis = maxIntervalMillis;
+    }
+
+    @Override
+    public Integer call() throws Exception {
+      invocationCount++;
+      int last = current;
+      if (last < maxIntervalMillis) {
+        current += intervalMillis;
+      }
+      return last;
+    }
+
+    public int getInvocationCount() {
+      return invocationCount;
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          "LinearRetryInterval{");
+      sb.append("interval=").append(intervalMillis);
+      sb.append(", current=").append(current);
+      sb.append(", limit=").append(maxIntervalMillis);
+      sb.append(", invocationCount=").append(invocationCount);
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+
+  /**
+   * An exception which triggers a fast exist from the
+   * {@link #evaluate(Callable, int, Callable)} and
+   * {@link #eventually(Callable, int, Callable, TimeoutHandler)} loops.
+   */
+  public static class FailFastException extends Exception {
+
+    public FailFastException(String detailMessage) {
+      super(detailMessage);
+    }
+
+    public FailFastException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    /**
+     * Instantiate from a format string.
+     * @param format format string
+     * @param args arguments to format
+     * @return an instance with the message string constructed.
+     */
+    public static FailFastException newInstance(String format, Object...args) {
+      return new FailFastException(String.format(format, args));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42f8a1d6/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java
new file mode 100644
index 0000000..a2af864
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/TestLambdaTestUtils.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.test;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.test.LambdaTestUtils.*;
+import static org.apache.hadoop.test.GenericTestUtils.*;
+
+/**
+ * Test the logic in {@link LambdaTestUtils}.
+ * This test suite includes Java 8 and Java 7 code; the Java 8 code exists
+ * to verify that the API is easily used with Lambda expressions.
+ */
+public class TestLambdaTestUtils extends Assert {
+
+  public static final int INTERVAL = 10;
+  public static final int TIMEOUT = 50;
+  private FixedRetryInterval retry = new FixedRetryInterval(INTERVAL);
+  // counter for lambda expressions to use
+  private int count;
+
+  /**
+   * Always evaluates to true.
+   */
+  public static final Callable<Boolean> ALWAYS_TRUE =
+      new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          return true;
+        }
+      };
+
+  /**
+   * Always evaluates to false.
+   */
+  public static final Callable<Boolean> ALWAYS_FALSE =
+      new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          return false;
+        }
+      };
+
+  /**
+   * Text in the raised FNFE.
+   */
+  public static final String MISSING = "not found";
+
+  /**
+   * A predicate that always throws a FileNotFoundException.
+   */
+  public static final Callable<Boolean> ALWAYS_FNFE =
+      new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws Exception {
+          throw new FileNotFoundException(MISSING);
+        }
+      };
+
+  /**
+   * reusable timeout handler.
+   */
+  public static final GenerateTimeout
+      TIMEOUT_FAILURE_HANDLER = new GenerateTimeout();
+
+  /**
+   * Always evaluates to 3L.
+   */
+  public static final Callable<Long> EVAL_3L = new Callable<Long>() {
+    @Override
+    public Long call() throws Exception {
+      return 3L;
+    }
+  };
+
+  /**
+   * Always raises a {@code FileNotFoundException}.
+   */
+  public static final Callable<Long> EVAL_FNFE = new Callable<Long>() {
+    @Override
+    public Long call() throws Exception {
+      throw new FileNotFoundException(MISSING);
+    }
+  };
+
+  @Test
+  public void testEventuallyAlwaysTrue() throws Throwable {
+    eventually(
+        ALWAYS_TRUE,
+        TIMEOUT,
+        new FixedRetryInterval(INTERVAL),
+        TIMEOUT_FAILURE_HANDLER);
+  }
+
+  @Test
+  public void testEventuallyAlwaysFalse() throws Throwable {
+    try {
+      eventually(
+          ALWAYS_FALSE,
+          TIMEOUT,
+          retry,
+          TIMEOUT_FAILURE_HANDLER);
+      fail("should not have got here");
+    } catch (TimeoutException e) {
+      assertTrue(retry.getInvocationCount() > 4);
+    }
+  }
+
+  @Test
+  public void testEventuallyLinearRetry() throws Throwable {
+    LinearRetryInterval linearRetry = new LinearRetryInterval(
+        INTERVAL * 2,
+        TIMEOUT * 2);
+    try {
+      eventually(
+          ALWAYS_FALSE,
+          TIMEOUT,
+          linearRetry,
+          TIMEOUT_FAILURE_HANDLER);
+      fail("should not have got here");
+    } catch (TimeoutException e) {
+      assertEquals(linearRetry.toString(),
+          2, linearRetry.getInvocationCount());
+    }
+  }
+
+  @Test
+  public void testEventuallyFNFE() throws Throwable {
+    try {
+      eventually(
+          ALWAYS_FNFE,
+          TIMEOUT,
+          retry,
+          TIMEOUT_FAILURE_HANDLER);
+      fail("should not have got here");
+    } catch (TimeoutException e) {
+      // inner clause is included
+      assertTrue(retry.getInvocationCount() > 0);
+      assertTrue(e.getCause() instanceof FileNotFoundException);
+      assertExceptionContains(MISSING, e);
+    }
+  }
+
+  @Test
+  public void testEvaluate() throws Throwable {
+    long result = evaluate(EVAL_3L,
+        TIMEOUT,
+        retry);
+    assertEquals(3, result);
+    assertEquals(0, retry.getInvocationCount());
+  }
+
+  @Test
+  public void testEvalFailuresRetry() throws Throwable {
+    try {
+      evaluate(EVAL_FNFE,
+          TIMEOUT,
+          retry);
+      fail("should not have got here");
+    } catch (IOException expected) {
+      // expected
+      assertMinRetryCount(1);
+    }
+  }
+
+  @Test
+  public void testLinearRetryInterval() throws Throwable {
+    LinearRetryInterval interval = new LinearRetryInterval(200, 1000);
+    assertEquals(200, (int) interval.call());
+    assertEquals(400, (int) interval.call());
+    assertEquals(600, (int) interval.call());
+    assertEquals(800, (int) interval.call());
+    assertEquals(1000, (int) interval.call());
+    assertEquals(1000, (int) interval.call());
+    assertEquals(1000, (int) interval.call());
+  }
+
+  @Test
+  public void testInterceptSuccess() throws Throwable {
+    IOException ioe = intercept(IOException.class, ALWAYS_FNFE);
+    assertExceptionContains(MISSING, ioe);
+  }
+
+  @Test
+  public void testInterceptContainsSuccess() throws Throwable {
+    intercept(IOException.class, MISSING, ALWAYS_FNFE);
+  }
+
+  @Test
+  public void testInterceptContainsWrongString() throws Throwable {
+    try {
+      FileNotFoundException e =
+          intercept(FileNotFoundException.class, "404", ALWAYS_FNFE);
+      throw e;
+    } catch (AssertionError expected) {
+      assertExceptionContains(MISSING, expected);
+    }
+  }
+
+  /**
+   * Assert the retry count is as expected.
+   * @param expected expected value
+   */
+  protected void assertRetryCount(int expected) {
+    assertEquals(retry.toString(), expected, retry.getInvocationCount());
+  }
+
+  /**
+   * Assert the retry count is as expected.
+   * @param minCount minimum value
+   */
+  protected void assertMinRetryCount(int minCount) {
+    assertTrue("retry count of " + retry
+            + " is not >= " + minCount,
+        minCount <= retry.getInvocationCount());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42f8a1d6/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
index 0686488..728b1a9 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.apache.hadoop.test.LambdaTestUtils;
+
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,8 +36,6 @@ import java.io.FileNotFoundException;
 import java.util.concurrent.Callable;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
-import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 
 /**
  * Test S3A Failure translation, including a functional test
@@ -68,13 +68,13 @@ public class ITestS3AFailureHandling extends AbstractFSContractTestBase {
       writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true);
       // here the file length is less. Probe the file to see if this is true,
       // with a spin and wait
-      eventually(30 *1000, new Callable<Void>() {
+      LambdaTestUtils.evaluate(new Callable<Void>() {
         @Override
         public Void call() throws Exception {
           assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
           return null;
         }
-      });
+      }, 30 * 1000, 1000);
       // here length is shorter. Assuming it has propagated to all replicas,
       // the position of the input stream is now beyond the EOF.
       // An attempt to seek backwards to a position greater than the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/42f8a1d6/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index c67e118..249ba10 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -136,32 +136,6 @@ public class S3ATestUtils {
   }
 
   /**
-   * Repeatedly attempt a callback until timeout or a {@link FailFastException}
-   * is raised. This is modeled on ScalaTests {@code eventually(Closure)} code.
-   * @param timeout timeout
-   * @param callback callback to invoke
-   * @throws FailFastException any fast-failure
-   * @throws Exception the exception which caused the iterator to fail
-   */
-  public static void eventually(int timeout, Callable<Void> callback)
-      throws Exception {
-    Exception lastException;
-    long endtime = System.currentTimeMillis() + timeout;
-    do {
-      try {
-        callback.call();
-        return;
-      } catch (InterruptedException | FailFastException e) {
-        throw e;
-      } catch (Exception e) {
-        lastException = e;
-      }
-      Thread.sleep(500);
-    } while (endtime > System.currentTimeMillis());
-    throw lastException;
-  }
-
-  /**
    * patch the endpoint option so that irrespective of where other tests
    * are working, the IO performance tests can work with the landsat
    * images.
@@ -291,27 +265,6 @@ public class S3ATestUtils {
   }
 
   /**
-   * The exception to raise so as to exit fast from
-   * {@link #eventually(int, Callable)}.
-   */
-  public static class FailFastException extends Exception {
-    public FailFastException() {
-    }
-
-    public FailFastException(String message) {
-      super(message);
-    }
-
-    public FailFastException(String message, Throwable cause) {
-      super(message, cause);
-    }
-
-    public FailFastException(Throwable cause) {
-      super(cause);
-    }
-  }
-
-  /**
    * Verify the class of an exception. If it is not as expected, rethrow it.
    * Comparison is on the exact class, not subclass-of inference as
    * offered by {@code instanceof}.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org