You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/06 08:19:17 UTC
flink git commit: [FLINK-7576] [futures] Add
FutureUtils.retryWithDelay
Repository: flink
Updated Branches:
refs/heads/master 6d2124ee2 -> 769ce2a07
[FLINK-7576] [futures] Add FutureUtils.retryWithDelay
FutureUtils.retryWithDelay executes the given operation of type
Callable<CompletableFuture<T>> n times and waits in between retries the given
delay. This allows to retry an operation with a specified delay.
Make retry and retry with delay future properly cancellable
This closes #4637.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/769ce2a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/769ce2a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/769ce2a0
Branch: refs/heads/master
Commit: 769ce2a07424712d62b6ead87c10aee08d85c216
Parents: 6d2124e
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Sep 4 16:42:24 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 6 10:18:52 2017 +0200
----------------------------------------------------------------------
.../flink/runtime/concurrent/FutureUtils.java | 171 +++++++--
.../runtime/concurrent/ConjunctFutureTest.java | 246 +++++++++++++
.../runtime/concurrent/FutureUtilsTest.java | 359 +++++++++----------
3 files changed, 554 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/769ce2a0/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 5c6439d..b982c8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -26,13 +26,15 @@ import akka.dispatch.OnComplete;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
+import java.util.function.Supplier;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
@@ -48,6 +50,7 @@ public class FutureUtils {
// retrying operations
// ------------------------------------------------------------------------
+
/**
* Retry the given operation the given number of times in case of a failure.
*
@@ -58,35 +61,135 @@ public class FutureUtils {
* @return Future containing either the result of the operation or a {@link RetryException}
*/
public static <T> CompletableFuture<T> retry(
- final Callable<CompletableFuture<T>> operation,
- final int retries,
- final Executor executor) {
+ final Supplier<CompletableFuture<T>> operation,
+ final int retries,
+ final Executor executor) {
+
+ final CompletableFuture<T> resultFuture = new CompletableFuture<>();
- CompletableFuture<T> operationResultFuture;
+ retryOperation(resultFuture, operation, retries, executor);
+
+ return resultFuture;
+ }
- try {
- operationResultFuture = operation.call();
- } catch (Exception e) {
- return FutureUtils.completedExceptionally(new RetryException("Could not execute the provided operation.", e));
+ /**
+ * Helper method which retries the provided operation in case of a failure.
+ *
+ * @param resultFuture to complete
+ * @param operation to retry
+ * @param retries until giving up
+ * @param executor to run the futures
+ * @param <T> type of the future's result
+ */
+ private static <T> void retryOperation(
+ final CompletableFuture<T> resultFuture,
+ final Supplier<CompletableFuture<T>> operation,
+ final int retries,
+ final Executor executor) {
+
+ if (!resultFuture.isDone()) {
+ final CompletableFuture<T> operationFuture = operation.get();
+
+ operationFuture.whenCompleteAsync(
+ (t, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof CancellationException) {
+ resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable));
+ } else {
+ if (retries > 0) {
+ retryOperation(
+ resultFuture,
+ operation,
+ retries - 1,
+ executor);
+ } else {
+ resultFuture.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
+ "has been exhausted.", throwable));
+ }
+ }
+ } else {
+ resultFuture.complete(t);
+ }
+ },
+ executor);
+
+ resultFuture.whenComplete(
+ (t, throwable) -> operationFuture.cancel(false));
}
+ }
+
+ /**
+ * Retry the given operation with the given delay in between failures.
+ *
+ * @param operation to retry
+ * @param retries number of retries
+ * @param retryDelay delay between retries
+ * @param scheduledExecutor executor to be used for the retry operation
+ * @param <T> type of the result
+ * @return Future which retries the given operation a given amount of times and delays the retry in case of failures
+ */
+ public static <T> CompletableFuture<T> retryWithDelay(
+ final Supplier<CompletableFuture<T>> operation,
+ final int retries,
+ final Time retryDelay,
+ final ScheduledExecutor scheduledExecutor) {
+
+ final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+
+ retryOperationWithDelay(
+ resultFuture,
+ operation,
+ retries,
+ retryDelay,
+ scheduledExecutor);
+
+ return resultFuture;
+ }
- return operationResultFuture.handleAsync(
- (t, throwable) -> {
- if (throwable != null) {
- if (retries > 0) {
- return retry(operation, retries - 1, executor);
+ private static <T> void retryOperationWithDelay(
+ final CompletableFuture<T> resultFuture,
+ final Supplier<CompletableFuture<T>> operation,
+ final int retries,
+ final Time retryDelay,
+ final ScheduledExecutor scheduledExecutor) {
+
+ if (!resultFuture.isDone()) {
+ final CompletableFuture<T> operationResultFuture = operation.get();
+
+ operationResultFuture.whenCompleteAsync(
+ (t, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof CancellationException) {
+ resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable));
+ } else {
+ if (retries > 0) {
+ final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+ () -> retryOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor),
+ retryDelay.toMilliseconds(),
+ TimeUnit.MILLISECONDS);
+
+ resultFuture.whenComplete(
+ (innerT, innerThrowable) -> scheduledFuture.cancel(false));
+ } else {
+ resultFuture.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
+ "has been exhausted.", throwable));
+ }
+ }
} else {
- return FutureUtils.<T>completedExceptionally(new RetryException("Could not complete the operation. Number of retries " +
- "has been exhausted.", throwable));
+ resultFuture.complete(t);
}
- } else {
- return CompletableFuture.completedFuture(t);
- }
- },
- executor)
- .thenCompose(value -> value);
+ },
+ scheduledExecutor);
+
+ resultFuture.whenComplete(
+ (t, throwable) -> operationResultFuture.cancel(false));
+ }
}
+ /**
+ * Exception with which the returned future is completed if the {@link #retry(Supplier, int, Executor)}
+ * operation fails.
+ */
public static class RetryException extends Exception {
private static final long serialVersionUID = 3613470781274141862L;
@@ -109,14 +212,14 @@ public class FutureUtils {
// ------------------------------------------------------------------------
/**
- * Creates a future that is complete once multiple other futures completed.
+ * Creates a future that is complete once multiple other futures completed.
* The future fails (completes exceptionally) once one of the futures in the
* conjunction fails. Upon successful completion, the future returns the
* collection of the futures' results.
*
* <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
- * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}.
- *
+ * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}.
+ *
* @param futures The futures that make up the conjunction. No null entries are allowed.
* @return The ConjunctFuture that completes once all given futures are complete (or one fails).
*/
@@ -158,7 +261,7 @@ public class FutureUtils {
* A future that is complete once multiple other futures completed. The futures are not
* necessarily of the same type. The ConjunctFuture fails (completes exceptionally) once
* one of the Futures in the conjunction fails.
- *
+ *
* <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
* {@link CompletableFuture#thenCombine(CompletionStage, BiFunction)} )}) is that ConjunctFuture
* also tracks how many of the Futures are already complete.
@@ -183,16 +286,16 @@ public class FutureUtils {
*/
private static class ResultConjunctFuture<T> extends ConjunctFuture<Collection<T>> {
- /** The total number of futures in the conjunction */
+ /** The total number of futures in the conjunction. */
private final int numTotal;
- /** The next free index in the results arrays */
+ /** The next free index in the results arrays. */
private final AtomicInteger nextIndex = new AtomicInteger(0);
- /** The number of futures in the conjunction that are already complete */
+ /** The number of futures in the conjunction that are already complete. */
private final AtomicInteger numCompleted = new AtomicInteger(0);
- /** The set of collected results so far */
+ /** The set of collected results so far. */
private volatile T[] results;
/** The function that is attached to all futures in the conjunction. Once a future
@@ -215,7 +318,7 @@ public class FutureUtils {
@SuppressWarnings("unchecked")
ResultConjunctFuture(int numTotal) {
this.numTotal = numTotal;
- results = (T[])new Object[numTotal];
+ results = (T[]) new Object[numTotal];
}
@Override
@@ -235,13 +338,13 @@ public class FutureUtils {
*/
private static final class WaitingConjunctFuture extends ConjunctFuture<Void> {
- /** Number of completed futures */
+ /** Number of completed futures. */
private final AtomicInteger numCompleted = new AtomicInteger(0);
- /** Total number of futures to wait on */
+ /** Total number of futures to wait on. */
private final int numTotal;
- /** Method which increments the atomic completion counter and completes or fails the WaitingFutureImpl */
+ /** Method which increments the atomic completion counter and completes or fails the WaitingFutureImpl. */
private void handleCompletedFuture(Object ignored, Throwable throwable) {
if (throwable == null) {
if (numTotal == numCompleted.incrementAndGet()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/769ce2a0/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
new file mode 100644
index 0000000..f92504e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link ConjunctFuture} and {@link FutureUtils.WaitingConjunctFuture}.
+ */
+@RunWith(Parameterized.class)
+public class ConjunctFutureTest extends TestLogger {
+
+ @Parameterized.Parameters
+ public static Collection<FutureFactory> parameters (){
+ return Arrays.asList(new ConjunctFutureFactory(), new WaitingFutureFactory());
+ }
+
+ @Parameterized.Parameter
+ public FutureFactory futureFactory;
+
+ @Test
+ public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
+ try {
+ futureFactory.createFuture(null);
+ fail();
+ } catch (NullPointerException ignored) {}
+
+ try {
+ futureFactory.createFuture(Arrays.asList(
+ new CompletableFuture<>(),
+ null,
+ new CompletableFuture<>()));
+ fail();
+ } catch (NullPointerException ignored) {}
+ }
+
+ @Test
+ public void testConjunctFutureCompletion() throws Exception {
+ // some futures that we combine
+ java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
+
+ // some future is initially completed
+ future2.complete(new Object());
+
+ // build the conjunct future
+ ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
+
+ CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
+
+ assertEquals(4, result.getNumFuturesTotal());
+ assertEquals(1, result.getNumFuturesCompleted());
+ assertFalse(result.isDone());
+ assertFalse(resultMapped.isDone());
+
+ // complete two more futures
+ future4.complete(new Object());
+ assertEquals(2, result.getNumFuturesCompleted());
+ assertFalse(result.isDone());
+ assertFalse(resultMapped.isDone());
+
+ future1.complete(new Object());
+ assertEquals(3, result.getNumFuturesCompleted());
+ assertFalse(result.isDone());
+ assertFalse(resultMapped.isDone());
+
+ // complete one future again
+ future1.complete(new Object());
+ assertEquals(3, result.getNumFuturesCompleted());
+ assertFalse(result.isDone());
+ assertFalse(resultMapped.isDone());
+
+ // complete the final future
+ future3.complete(new Object());
+ assertEquals(4, result.getNumFuturesCompleted());
+ assertTrue(result.isDone());
+ assertTrue(resultMapped.isDone());
+ }
+
+ @Test
+ public void testConjunctFutureFailureOnFirst() throws Exception {
+
+ java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
+
+ // build the conjunct future
+ ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
+
+ CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
+
+ assertEquals(4, result.getNumFuturesTotal());
+ assertEquals(0, result.getNumFuturesCompleted());
+ assertFalse(result.isDone());
+ assertFalse(resultMapped.isDone());
+
+ future2.completeExceptionally(new IOException());
+
+ assertEquals(0, result.getNumFuturesCompleted());
+ assertTrue(result.isDone());
+ assertTrue(resultMapped.isDone());
+
+ try {
+ result.get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof IOException);
+ }
+
+ try {
+ resultMapped.get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof IOException);
+ }
+ }
+
+ @Test
+ public void testConjunctFutureFailureOnSuccessive() throws Exception {
+
+ java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
+ java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
+
+ // build the conjunct future
+ ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
+ assertEquals(4, result.getNumFuturesTotal());
+
+ java.util.concurrent.CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
+
+ future1.complete(new Object());
+ future3.complete(new Object());
+ future4.complete(new Object());
+
+ future2.completeExceptionally(new IOException());
+
+ assertEquals(3, result.getNumFuturesCompleted());
+ assertTrue(result.isDone());
+ assertTrue(resultMapped.isDone());
+
+ try {
+ result.get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof IOException);
+ }
+
+ try {
+ resultMapped.get();
+ fail();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause() instanceof IOException);
+ }
+ }
+
+ /**
+ * Tests that the conjunct future returns upon completion the collection of all future values
+ */
+ @Test
+ public void testConjunctFutureValue() throws ExecutionException, InterruptedException {
+ java.util.concurrent.CompletableFuture<Integer> future1 = java.util.concurrent.CompletableFuture.completedFuture(1);
+ java.util.concurrent.CompletableFuture<Long> future2 = java.util.concurrent.CompletableFuture.completedFuture(2L);
+ java.util.concurrent.CompletableFuture<Double> future3 = new java.util.concurrent.CompletableFuture<>();
+
+ ConjunctFuture<Collection<Number>> result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3));
+
+ assertFalse(result.isDone());
+
+ future3.complete(.1);
+
+ assertTrue(result.isDone());
+
+ assertThat(result.get(), IsIterableContainingInAnyOrder.<Number>containsInAnyOrder(1, 2L, .1));
+ }
+
+ @Test
+ public void testConjunctOfNone() throws Exception {
+ final ConjunctFuture<?> result = futureFactory.createFuture(Collections.<java.util.concurrent.CompletableFuture<Object>>emptyList());
+
+ assertEquals(0, result.getNumFuturesTotal());
+ assertEquals(0, result.getNumFuturesCompleted());
+ assertTrue(result.isDone());
+ }
+
+ /**
+ * Factory to create {@link ConjunctFuture} for testing.
+ */
+ private interface FutureFactory {
+ ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures);
+ }
+
+ private static class ConjunctFutureFactory implements FutureFactory {
+
+ @Override
+ public ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
+ return FutureUtils.combineAll(futures);
+ }
+ }
+
+ private static class WaitingFutureFactory implements FutureFactory {
+
+ @Override
+ public ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
+ return FutureUtils.waitForAll(futures);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/769ce2a0/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index cc95e7a..c624ef2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -18,225 +18,208 @@
package org.apache.flink.runtime.concurrent;
-import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
-
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
-import org.hamcrest.collection.IsIterableContainingInAnyOrder;
+
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.mockito.invocation.InvocationOnMock;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-
-import static org.junit.Assert.*;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
/**
- * Tests for the utility methods in {@link FutureUtils}
+ * Tests for the utility methods in {@link FutureUtils}.
*/
-@RunWith(Parameterized.class)
-public class FutureUtilsTest extends TestLogger{
-
- @Parameterized.Parameters
- public static Collection<FutureFactory> parameters (){
- return Arrays.asList(new ConjunctFutureFactory(), new WaitingFutureFactory());
- }
-
- @Parameterized.Parameter
- public FutureFactory futureFactory;
-
- @Test
- public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
- try {
- futureFactory.createFuture(null);
- fail();
- } catch (NullPointerException ignored) {}
-
- try {
- futureFactory.createFuture(Arrays.asList(
- new CompletableFuture<>(),
- null,
- new CompletableFuture<>()));
- fail();
- } catch (NullPointerException ignored) {}
- }
+public class FutureUtilsTest extends TestLogger {
+ /**
+ * Tests that we can retry an operation.
+ */
@Test
- public void testConjunctFutureCompletion() throws Exception {
- // some futures that we combine
- java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
- java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
- java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
- java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
-
- // some future is initially completed
- future2.complete(new Object());
-
- // build the conjunct future
- ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
-
- CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
-
- assertEquals(4, result.getNumFuturesTotal());
- assertEquals(1, result.getNumFuturesCompleted());
- assertFalse(result.isDone());
- assertFalse(resultMapped.isDone());
-
- // complete two more futures
- future4.complete(new Object());
- assertEquals(2, result.getNumFuturesCompleted());
- assertFalse(result.isDone());
- assertFalse(resultMapped.isDone());
-
- future1.complete(new Object());
- assertEquals(3, result.getNumFuturesCompleted());
- assertFalse(result.isDone());
- assertFalse(resultMapped.isDone());
-
- // complete one future again
- future1.complete(new Object());
- assertEquals(3, result.getNumFuturesCompleted());
- assertFalse(result.isDone());
- assertFalse(resultMapped.isDone());
-
- // complete the final future
- future3.complete(new Object());
- assertEquals(4, result.getNumFuturesCompleted());
- assertTrue(result.isDone());
- assertTrue(resultMapped.isDone());
+ public void testRetrySuccess() throws Exception {
+ final int retries = 10;
+ final AtomicInteger atomicInteger = new AtomicInteger(0);
+ CompletableFuture<Boolean> retryFuture = FutureUtils.retry(
+ () ->
+ CompletableFuture.supplyAsync(
+ () -> {
+ if (atomicInteger.incrementAndGet() == retries) {
+ return true;
+ } else {
+ throw new FlinkFutureException("Test exception");
+ }
+ },
+ TestingUtils.defaultExecutor()),
+ retries,
+ TestingUtils.defaultExecutor());
+
+ assertTrue(retryFuture.get());
+ assertTrue(retries == atomicInteger.get());
}
- @Test
- public void testConjunctFutureFailureOnFirst() throws Exception {
-
- java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
- java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
- java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
- java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
-
- // build the conjunct future
- ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
-
- CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
-
- assertEquals(4, result.getNumFuturesTotal());
- assertEquals(0, result.getNumFuturesCompleted());
- assertFalse(result.isDone());
- assertFalse(resultMapped.isDone());
-
- future2.completeExceptionally(new IOException());
-
- assertEquals(0, result.getNumFuturesCompleted());
- assertTrue(result.isDone());
- assertTrue(resultMapped.isDone());
+ /**
+ * Tests that a retry future is failed after all retries have been consumed.
+ */
+ @Test(expected = FutureUtils.RetryException.class)
+ public void testRetryFailure() throws Throwable {
+ final int retries = 3;
- try {
- result.get();
- fail();
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof IOException);
- }
+ CompletableFuture<?> retryFuture = FutureUtils.retry(
+ () -> FutureUtils.completedExceptionally(new FlinkException("Test exception")),
+ retries,
+ TestingUtils.defaultExecutor());
try {
- resultMapped.get();
- fail();
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof IOException);
+ retryFuture.get();
+ } catch (ExecutionException ee) {
+ throw ExceptionUtils.stripExecutionException(ee);
}
}
+ /**
+ * Tests that we can cancel a retry future.
+ */
@Test
- public void testConjunctFutureFailureOnSuccessive() throws Exception {
-
- java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
- java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
- java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
- java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
-
- // build the conjunct future
- ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
- assertEquals(4, result.getNumFuturesTotal());
-
- java.util.concurrent.CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
-
- future1.complete(new Object());
- future3.complete(new Object());
- future4.complete(new Object());
-
- future2.completeExceptionally(new IOException());
-
- assertEquals(3, result.getNumFuturesCompleted());
- assertTrue(result.isDone());
- assertTrue(resultMapped.isDone());
-
- try {
- result.get();
- fail();
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof IOException);
- }
-
- try {
- resultMapped.get();
- fail();
- } catch (ExecutionException e) {
- assertTrue(e.getCause() instanceof IOException);
+ public void testRetryCancellation() throws Exception {
+ final int retries = 10;
+ final AtomicInteger atomicInteger = new AtomicInteger(0);
+ final OneShotLatch notificationLatch = new OneShotLatch();
+ final OneShotLatch waitLatch = new OneShotLatch();
+ final AtomicReference<Throwable> atomicThrowable = new AtomicReference<>(null);
+
+ CompletableFuture<?> retryFuture = FutureUtils.retry(
+ () ->
+ CompletableFuture.supplyAsync(
+ () -> {
+ if (atomicInteger.incrementAndGet() == 2) {
+ notificationLatch.trigger();
+ try {
+ waitLatch.await();
+ } catch (InterruptedException e) {
+ atomicThrowable.compareAndSet(null, e);
+ }
+ }
+
+ throw new FlinkFutureException("Test exception");
+ },
+ TestingUtils.defaultExecutor()),
+ retries,
+ TestingUtils.defaultExecutor());
+
+ // await that we have failed once
+ notificationLatch.await();
+
+ assertFalse(retryFuture.isDone());
+
+ // cancel the retry future
+ retryFuture.cancel(false);
+
+ // let the retry operation continue
+ waitLatch.trigger();
+
+ assertTrue(retryFuture.isCancelled());
+ assertEquals(2, atomicInteger.get());
+
+ if (atomicThrowable.get() != null) {
+ throw new FlinkException("Exception occurred in the retry operation.", atomicThrowable.get());
}
}
/**
- * Tests that the conjunct future returns upon completion the collection of all future values
+ * Tests that retry with delay fails after having exceeded all retries.
*/
- @Test
- public void testConjunctFutureValue() throws ExecutionException, InterruptedException {
- java.util.concurrent.CompletableFuture<Integer> future1 = java.util.concurrent.CompletableFuture.completedFuture(1);
- java.util.concurrent.CompletableFuture<Long> future2 = java.util.concurrent.CompletableFuture.completedFuture(2L);
- java.util.concurrent.CompletableFuture<Double> future3 = new java.util.concurrent.CompletableFuture<>();
-
- ConjunctFuture<Collection<Number>> result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3));
-
- assertFalse(result.isDone());
-
- future3.complete(.1);
-
- assertTrue(result.isDone());
+ @Test(expected = FutureUtils.RetryException.class)
+ public void testRetryWithDelayFailure() throws Throwable {
+ CompletableFuture<?> retryFuture = FutureUtils.retryWithDelay(
+ () -> FutureUtils.completedExceptionally(new FlinkException("Test exception")),
+ 3,
+ Time.milliseconds(1L),
+ TestingUtils.defaultScheduledExecutor());
- assertThat(result.get(), IsIterableContainingInAnyOrder.<Number>containsInAnyOrder(1, 2L, .1));
+ try {
+ retryFuture.get(TestingUtils.TIMEOUT().toMilliseconds(), TimeUnit.MILLISECONDS);
+ } catch (ExecutionException ee) {
+ throw ExceptionUtils.stripExecutionException(ee);
+ }
}
+ /**
+ * Tests that the delay is respected between subsequent retries of a retry future with retry delay.
+ */
@Test
- public void testConjunctOfNone() throws Exception {
- final ConjunctFuture<?> result = futureFactory.createFuture(Collections.<java.util.concurrent.CompletableFuture<Object>>emptyList());
-
- assertEquals(0, result.getNumFuturesTotal());
- assertEquals(0, result.getNumFuturesCompleted());
- assertTrue(result.isDone());
+ public void testRetryWithDelay() throws Exception {
+ final int retries = 4;
+ final Time delay = Time.milliseconds(50L);
+ final AtomicInteger countDown = new AtomicInteger(retries);
+
+ CompletableFuture<Boolean> retryFuture = FutureUtils.retryWithDelay(
+ () -> {
+ if (countDown.getAndDecrement() == 0) {
+ return CompletableFuture.completedFuture(true);
+ } else {
+ return FutureUtils.completedExceptionally(new FlinkException("Test exception."));
+ }
+ },
+ retries,
+ delay,
+ TestingUtils.defaultScheduledExecutor());
+
+ long start = System.currentTimeMillis();
+
+ Boolean result = retryFuture.get();
+
+ long completionTime = System.currentTimeMillis() - start;
+
+ assertTrue(result);
+ assertTrue("The completion time should be at least rertries times delay between retries.", completionTime >= retries * delay.toMilliseconds());
}
/**
- * Factory to create {@link ConjunctFuture} for testing.
+ * Tests that all scheduled tasks are canceled if the retry future is being cancelled.
*/
- private interface FutureFactory {
- ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures);
- }
-
- private static class ConjunctFutureFactory implements FutureFactory {
-
- @Override
- public ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
- return FutureUtils.combineAll(futures);
- }
- }
-
- private static class WaitingFutureFactory implements FutureFactory {
-
- @Override
- public ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
- return FutureUtils.waitForAll(futures);
- }
+ @Test
+ public void testRetryWithDelayCancellation() {
+ ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class);
+ ScheduledExecutor scheduledExecutorMock = mock(ScheduledExecutor.class);
+ doReturn(scheduledFutureMock).when(scheduledExecutorMock).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+ doAnswer(
+ (InvocationOnMock invocation) -> {
+ invocation.getArgumentAt(0, Runnable.class).run();
+ return null;
+ }).when(scheduledExecutorMock).execute(any(Runnable.class));
+
+ CompletableFuture<?> retryFuture = FutureUtils.retryWithDelay(
+ () -> FutureUtils.completedExceptionally(new FlinkException("Test exception")),
+ 1,
+ TestingUtils.infiniteTime(),
+ scheduledExecutorMock);
+
+ assertFalse(retryFuture.isDone());
+
+ verify(scheduledExecutorMock).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+
+ retryFuture.cancel(false);
+
+ assertTrue(retryFuture.isCancelled());
+ verify(scheduledFutureMock).cancel(anyBoolean());
}
}