You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/09/06 08:19:17 UTC

flink git commit: [FLINK-7576] [futures] Add FutureUtils.retryWithDelay

Repository: flink
Updated Branches:
  refs/heads/master 6d2124ee2 -> 769ce2a07


[FLINK-7576] [futures] Add FutureUtils.retryWithDelay

FutureUtils.retryWithDelay executes the given operation of type
Callable<CompletableFuture<T>> n times and waits in between retries the given
delay. This allows to retry an operation with a specified delay.

Make retry and retry with delay future properly cancellable

This closes #4637.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/769ce2a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/769ce2a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/769ce2a0

Branch: refs/heads/master
Commit: 769ce2a07424712d62b6ead87c10aee08d85c216
Parents: 6d2124e
Author: Till Rohrmann <tr...@apache.org>
Authored: Mon Sep 4 16:42:24 2017 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Sep 6 10:18:52 2017 +0200

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   | 171 +++++++--
 .../runtime/concurrent/ConjunctFutureTest.java  | 246 +++++++++++++
 .../runtime/concurrent/FutureUtilsTest.java     | 359 +++++++++----------
 3 files changed, 554 insertions(+), 222 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/769ce2a0/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 5c6439d..b982c8e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -26,13 +26,15 @@ import akka.dispatch.OnComplete;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
+import java.util.function.Supplier;
 
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -48,6 +50,7 @@ public class FutureUtils {
 	//  retrying operations
 	// ------------------------------------------------------------------------
 
+
 	/**
 	 * Retry the given operation the given number of times in case of a failure.
 	 *
@@ -58,35 +61,135 @@ public class FutureUtils {
 	 * @return Future containing either the result of the operation or a {@link RetryException}
 	 */
 	public static <T> CompletableFuture<T> retry(
-		final Callable<CompletableFuture<T>> operation,
-		final int retries,
-		final Executor executor) {
+			final Supplier<CompletableFuture<T>> operation,
+			final int retries,
+			final Executor executor) {
+
+		final CompletableFuture<T> resultFuture = new CompletableFuture<>();
 
-		CompletableFuture<T> operationResultFuture;
+		retryOperation(resultFuture, operation, retries, executor);
+
+		return resultFuture;
+	}
 
-		try {
-			operationResultFuture = operation.call();
-		} catch (Exception e) {
-			return FutureUtils.completedExceptionally(new RetryException("Could not execute the provided operation.", e));
+	/**
+	 * Helper method which retries the provided operation in case of a failure.
+	 *
+	 * @param resultFuture to complete
+	 * @param operation to retry
+	 * @param retries until giving up
+	 * @param executor to run the futures
+	 * @param <T> type of the future's result
+	 */
+	private static <T> void retryOperation(
+			final CompletableFuture<T> resultFuture,
+			final Supplier<CompletableFuture<T>> operation,
+			final int retries,
+			final Executor executor) {
+
+		if (!resultFuture.isDone()) {
+			final CompletableFuture<T> operationFuture = operation.get();
+
+			operationFuture.whenCompleteAsync(
+				(t, throwable) -> {
+					if (throwable != null) {
+						if (throwable instanceof CancellationException) {
+							resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable));
+						} else {
+							if (retries > 0) {
+								retryOperation(
+									resultFuture,
+									operation,
+									retries - 1,
+									executor);
+							} else {
+								resultFuture.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
+									"has been exhausted.", throwable));
+							}
+						}
+					} else {
+						resultFuture.complete(t);
+					}
+				},
+				executor);
+
+			resultFuture.whenComplete(
+				(t, throwable) -> operationFuture.cancel(false));
 		}
+	}
+
+	/**
+	 * Retry the given operation with the given delay in between failures.
+	 *
+	 * @param operation to retry
+	 * @param retries number of retries
+	 * @param retryDelay delay between retries
+	 * @param scheduledExecutor executor to be used for the retry operation
+	 * @param <T> type of the result
+	 * @return Future which retries the given operation a given amount of times and delays the retry in case of failures
+	 */
+	public static <T> CompletableFuture<T> retryWithDelay(
+			final Supplier<CompletableFuture<T>> operation,
+			final int retries,
+			final Time retryDelay,
+			final ScheduledExecutor scheduledExecutor) {
+
+		final CompletableFuture<T> resultFuture = new CompletableFuture<>();
+
+		retryOperationWithDelay(
+			resultFuture,
+			operation,
+			retries,
+			retryDelay,
+			scheduledExecutor);
+
+		return resultFuture;
+	}
 
-		return operationResultFuture.handleAsync(
-			(t, throwable) -> {
-				if (throwable != null) {
-					if (retries > 0) {
-						return retry(operation, retries - 1, executor);
+	private static <T> void retryOperationWithDelay(
+			final CompletableFuture<T> resultFuture,
+			final Supplier<CompletableFuture<T>> operation,
+			final int retries,
+			final Time retryDelay,
+			final ScheduledExecutor scheduledExecutor) {
+
+		if (!resultFuture.isDone()) {
+			final CompletableFuture<T> operationResultFuture = operation.get();
+
+			operationResultFuture.whenCompleteAsync(
+				(t, throwable) -> {
+					if (throwable != null) {
+						if (throwable instanceof CancellationException) {
+							resultFuture.completeExceptionally(new RetryException("Operation future was cancelled.", throwable));
+						} else {
+							if (retries > 0) {
+								final ScheduledFuture<?> scheduledFuture = scheduledExecutor.schedule(
+									() -> retryOperationWithDelay(resultFuture, operation, retries - 1, retryDelay, scheduledExecutor),
+									retryDelay.toMilliseconds(),
+									TimeUnit.MILLISECONDS);
+
+								resultFuture.whenComplete(
+									(innerT, innerThrowable) -> scheduledFuture.cancel(false));
+							} else {
+								resultFuture.completeExceptionally(new RetryException("Could not complete the operation. Number of retries " +
+									"has been exhausted.", throwable));
+							}
+						}
 					} else {
-						return FutureUtils.<T>completedExceptionally(new RetryException("Could not complete the operation. Number of retries " +
-							"has been exhausted.", throwable));
+						resultFuture.complete(t);
 					}
-				} else {
-					return CompletableFuture.completedFuture(t);
-				}
-			},
-			executor)
-		.thenCompose(value -> value);
+				},
+				scheduledExecutor);
+
+			resultFuture.whenComplete(
+				(t, throwable) -> operationResultFuture.cancel(false));
+		}
 	}
 
+	/**
+	 * Exception with which the returned future is completed if the {@link #retry(Supplier, int, Executor)}
+	 * operation fails.
+	 */
 	public static class RetryException extends Exception {
 
 		private static final long serialVersionUID = 3613470781274141862L;
@@ -109,14 +212,14 @@ public class FutureUtils {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates a future that is complete once multiple other futures completed. 
+	 * Creates a future that is complete once multiple other futures completed.
 	 * The future fails (completes exceptionally) once one of the futures in the
 	 * conjunction fails. Upon successful completion, the future returns the
 	 * collection of the futures' results.
 	 *
 	 * <p>The ConjunctFuture gives access to how many Futures in the conjunction have already
-	 * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}. 
-	 * 
+	 * completed successfully, via {@link ConjunctFuture#getNumFuturesCompleted()}.
+	 *
 	 * @param futures The futures that make up the conjunction. No null entries are allowed.
 	 * @return The ConjunctFuture that completes once all given futures are complete (or one fails).
 	 */
@@ -158,7 +261,7 @@ public class FutureUtils {
 	 * A future that is complete once multiple other futures completed. The futures are not
 	 * necessarily of the same type. The ConjunctFuture fails (completes exceptionally) once
 	 * one of the Futures in the conjunction fails.
-	 * 
+	 *
 	 * <p>The advantage of using the ConjunctFuture over chaining all the futures (such as via
 	 * {@link CompletableFuture#thenCombine(CompletionStage, BiFunction)} )}) is that ConjunctFuture
 	 * also tracks how many of the Futures are already complete.
@@ -183,16 +286,16 @@ public class FutureUtils {
 	 */
 	private static class ResultConjunctFuture<T> extends ConjunctFuture<Collection<T>> {
 
-		/** The total number of futures in the conjunction */
+		/** The total number of futures in the conjunction. */
 		private final int numTotal;
 
-		/** The next free index in the results arrays */
+		/** The next free index in the results arrays. */
 		private final AtomicInteger nextIndex = new AtomicInteger(0);
 
-		/** The number of futures in the conjunction that are already complete */
+		/** The number of futures in the conjunction that are already complete. */
 		private final AtomicInteger numCompleted = new AtomicInteger(0);
 
-		/** The set of collected results so far */
+		/** The set of collected results so far. */
 		private volatile T[] results;
 
 		/** The function that is attached to all futures in the conjunction. Once a future
@@ -215,7 +318,7 @@ public class FutureUtils {
 		@SuppressWarnings("unchecked")
 		ResultConjunctFuture(int numTotal) {
 			this.numTotal = numTotal;
-			results = (T[])new Object[numTotal];
+			results = (T[]) new Object[numTotal];
 		}
 
 		@Override
@@ -235,13 +338,13 @@ public class FutureUtils {
 	 */
 	private static final class WaitingConjunctFuture extends ConjunctFuture<Void> {
 
-		/** Number of completed futures */
+		/** Number of completed futures. */
 		private final AtomicInteger numCompleted = new AtomicInteger(0);
 
-		/** Total number of futures to wait on */
+		/** Total number of futures to wait on. */
 		private final int numTotal;
 
-		/** Method which increments the atomic completion counter and completes or fails the WaitingFutureImpl */
+		/** Method which increments the atomic completion counter and completes or fails the WaitingFutureImpl. */
 		private void handleCompletedFuture(Object ignored, Throwable throwable) {
 			if (throwable == null) {
 				if (numTotal == numCompleted.incrementAndGet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/769ce2a0/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
new file mode 100644
index 0000000..f92504e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/ConjunctFutureTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
+import org.apache.flink.util.TestLogger;
+
+import org.hamcrest.collection.IsIterableContainingInAnyOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link ConjunctFuture} and {@link FutureUtils.WaitingConjunctFuture}.
+ */
+@RunWith(Parameterized.class)
+public class ConjunctFutureTest extends TestLogger {
+
+	@Parameterized.Parameters
+	public static Collection<FutureFactory> parameters (){
+		return Arrays.asList(new ConjunctFutureFactory(), new WaitingFutureFactory());
+	}
+
+	@Parameterized.Parameter
+	public FutureFactory futureFactory;
+
+	@Test
+	public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
+		try {
+			futureFactory.createFuture(null);
+			fail();
+		} catch (NullPointerException ignored) {}
+
+		try {
+			futureFactory.createFuture(Arrays.asList(
+					new CompletableFuture<>(),
+					null,
+					new CompletableFuture<>()));
+			fail();
+		} catch (NullPointerException ignored) {}
+	}
+
+	@Test
+	public void testConjunctFutureCompletion() throws Exception {
+		// some futures that we combine
+		java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
+
+		// some future is initially completed
+		future2.complete(new Object());
+
+		// build the conjunct future
+		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
+
+		CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
+
+		assertEquals(4, result.getNumFuturesTotal());
+		assertEquals(1, result.getNumFuturesCompleted());
+		assertFalse(result.isDone());
+		assertFalse(resultMapped.isDone());
+
+		// complete two more futures
+		future4.complete(new Object());
+		assertEquals(2, result.getNumFuturesCompleted());
+		assertFalse(result.isDone());
+		assertFalse(resultMapped.isDone());
+
+		future1.complete(new Object());
+		assertEquals(3, result.getNumFuturesCompleted());
+		assertFalse(result.isDone());
+		assertFalse(resultMapped.isDone());
+
+		// complete one future again
+		future1.complete(new Object());
+		assertEquals(3, result.getNumFuturesCompleted());
+		assertFalse(result.isDone());
+		assertFalse(resultMapped.isDone());
+
+		// complete the final future
+		future3.complete(new Object());
+		assertEquals(4, result.getNumFuturesCompleted());
+		assertTrue(result.isDone());
+		assertTrue(resultMapped.isDone());
+	}
+
+	@Test
+	public void testConjunctFutureFailureOnFirst() throws Exception {
+
+		java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
+
+		// build the conjunct future
+		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
+
+		CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
+
+		assertEquals(4, result.getNumFuturesTotal());
+		assertEquals(0, result.getNumFuturesCompleted());
+		assertFalse(result.isDone());
+		assertFalse(resultMapped.isDone());
+
+		future2.completeExceptionally(new IOException());
+
+		assertEquals(0, result.getNumFuturesCompleted());
+		assertTrue(result.isDone());
+		assertTrue(resultMapped.isDone());
+
+		try {
+			result.get();
+			fail();
+		} catch (ExecutionException e) {
+			assertTrue(e.getCause() instanceof IOException);
+		}
+
+		try {
+			resultMapped.get();
+			fail();
+		} catch (ExecutionException e) {
+			assertTrue(e.getCause() instanceof IOException);
+		}
+	}
+
+	@Test
+	public void testConjunctFutureFailureOnSuccessive() throws Exception {
+
+		java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
+		java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
+
+		// build the conjunct future
+		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
+		assertEquals(4, result.getNumFuturesTotal());
+
+		java.util.concurrent.CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
+
+		future1.complete(new Object());
+		future3.complete(new Object());
+		future4.complete(new Object());
+
+		future2.completeExceptionally(new IOException());
+
+		assertEquals(3, result.getNumFuturesCompleted());
+		assertTrue(result.isDone());
+		assertTrue(resultMapped.isDone());
+
+		try {
+			result.get();
+			fail();
+		} catch (ExecutionException e) {
+			assertTrue(e.getCause() instanceof IOException);
+		}
+
+		try {
+			resultMapped.get();
+			fail();
+		} catch (ExecutionException e) {
+			assertTrue(e.getCause() instanceof IOException);
+		}
+	}
+
+	/**
+	 * Tests that the conjunct future returns upon completion the collection of all future values
+	 */
+	@Test
+	public void testConjunctFutureValue() throws ExecutionException, InterruptedException {
+		java.util.concurrent.CompletableFuture<Integer> future1 = java.util.concurrent.CompletableFuture.completedFuture(1);
+		java.util.concurrent.CompletableFuture<Long> future2 = java.util.concurrent.CompletableFuture.completedFuture(2L);
+		java.util.concurrent.CompletableFuture<Double> future3 = new java.util.concurrent.CompletableFuture<>();
+
+		ConjunctFuture<Collection<Number>> result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3));
+
+		assertFalse(result.isDone());
+
+		future3.complete(.1);
+
+		assertTrue(result.isDone());
+
+		assertThat(result.get(), IsIterableContainingInAnyOrder.<Number>containsInAnyOrder(1, 2L, .1));
+	}
+
+	@Test
+	public void testConjunctOfNone() throws Exception {
+		final ConjunctFuture<?> result = futureFactory.createFuture(Collections.<java.util.concurrent.CompletableFuture<Object>>emptyList());
+
+		assertEquals(0, result.getNumFuturesTotal());
+		assertEquals(0, result.getNumFuturesCompleted());
+		assertTrue(result.isDone());
+	}
+
+	/**
+	 * Factory to create {@link ConjunctFuture} for testing.
+	 */
+	private interface FutureFactory {
+		ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures);
+	}
+
+	private static class ConjunctFutureFactory implements FutureFactory {
+
+		@Override
+		public ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
+			return FutureUtils.combineAll(futures);
+		}
+	}
+
+	private static class WaitingFutureFactory implements FutureFactory {
+
+		@Override
+		public ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
+			return FutureUtils.waitForAll(futures);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/769ce2a0/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index cc95e7a..c624ef2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -18,225 +18,208 @@
 
 package org.apache.flink.runtime.concurrent;
 
-import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
-
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
-import org.hamcrest.collection.IsIterableContainingInAnyOrder;
+
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
+import org.mockito.invocation.InvocationOnMock;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-
-import static org.junit.Assert.*;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 /**
- * Tests for the utility methods in {@link FutureUtils}
+ * Tests for the utility methods in {@link FutureUtils}.
  */
-@RunWith(Parameterized.class)
-public class FutureUtilsTest extends TestLogger{
-
-	@Parameterized.Parameters
-	public static Collection<FutureFactory> parameters (){
-		return Arrays.asList(new ConjunctFutureFactory(), new WaitingFutureFactory());
-	}
-
-	@Parameterized.Parameter
-	public FutureFactory futureFactory;
-
-	@Test
-	public void testConjunctFutureFailsOnEmptyAndNull() throws Exception {
-		try {
-			futureFactory.createFuture(null);
-			fail();
-		} catch (NullPointerException ignored) {}
-
-		try {
-			futureFactory.createFuture(Arrays.asList(
-					new CompletableFuture<>(),
-					null,
-					new CompletableFuture<>()));
-			fail();
-		} catch (NullPointerException ignored) {}
-	}
+public class FutureUtilsTest extends TestLogger {
 
+	/**
+	 * Tests that we can retry an operation.
+	 */
 	@Test
-	public void testConjunctFutureCompletion() throws Exception {
-		// some futures that we combine
-		java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
-		java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
-		java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
-		java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
-
-		// some future is initially completed
-		future2.complete(new Object());
-
-		// build the conjunct future
-		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
-
-		CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
-
-		assertEquals(4, result.getNumFuturesTotal());
-		assertEquals(1, result.getNumFuturesCompleted());
-		assertFalse(result.isDone());
-		assertFalse(resultMapped.isDone());
-
-		// complete two more futures
-		future4.complete(new Object());
-		assertEquals(2, result.getNumFuturesCompleted());
-		assertFalse(result.isDone());
-		assertFalse(resultMapped.isDone());
-
-		future1.complete(new Object());
-		assertEquals(3, result.getNumFuturesCompleted());
-		assertFalse(result.isDone());
-		assertFalse(resultMapped.isDone());
-
-		// complete one future again
-		future1.complete(new Object());
-		assertEquals(3, result.getNumFuturesCompleted());
-		assertFalse(result.isDone());
-		assertFalse(resultMapped.isDone());
-
-		// complete the final future
-		future3.complete(new Object());
-		assertEquals(4, result.getNumFuturesCompleted());
-		assertTrue(result.isDone());
-		assertTrue(resultMapped.isDone());
+	public void testRetrySuccess() throws Exception {
+		final int retries = 10;
+		final AtomicInteger atomicInteger = new AtomicInteger(0);
+		CompletableFuture<Boolean> retryFuture = FutureUtils.retry(
+			() ->
+				CompletableFuture.supplyAsync(
+					() -> {
+						if (atomicInteger.incrementAndGet() == retries) {
+							return true;
+						} else {
+							throw new FlinkFutureException("Test exception");
+						}
+					},
+					TestingUtils.defaultExecutor()),
+			retries,
+			TestingUtils.defaultExecutor());
+
+		assertTrue(retryFuture.get());
+		assertTrue(retries == atomicInteger.get());
 	}
 
-	@Test
-	public void testConjunctFutureFailureOnFirst() throws Exception {
-
-		java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
-		java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
-		java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
-		java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
-
-		// build the conjunct future
-		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
-
-		CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
-
-		assertEquals(4, result.getNumFuturesTotal());
-		assertEquals(0, result.getNumFuturesCompleted());
-		assertFalse(result.isDone());
-		assertFalse(resultMapped.isDone());
-
-		future2.completeExceptionally(new IOException());
-
-		assertEquals(0, result.getNumFuturesCompleted());
-		assertTrue(result.isDone());
-		assertTrue(resultMapped.isDone());
+	/**
+	 * Tests that a retry future is failed after all retries have been consumed.
+	 */
+	@Test(expected = FutureUtils.RetryException.class)
+	public void testRetryFailure() throws Throwable {
+		final int retries = 3;
 
-		try {
-			result.get();
-			fail();
-		} catch (ExecutionException e) {
-			assertTrue(e.getCause() instanceof IOException);
-		}
+		CompletableFuture<?> retryFuture = FutureUtils.retry(
+			() -> FutureUtils.completedExceptionally(new FlinkException("Test exception")),
+			retries,
+			TestingUtils.defaultExecutor());
 
 		try {
-			resultMapped.get();
-			fail();
-		} catch (ExecutionException e) {
-			assertTrue(e.getCause() instanceof IOException);
+			retryFuture.get();
+		} catch (ExecutionException ee) {
+			throw ExceptionUtils.stripExecutionException(ee);
 		}
 	}
 
+	/**
+	 * Tests that we can cancel a retry future.
+	 */
 	@Test
-	public void testConjunctFutureFailureOnSuccessive() throws Exception {
-
-		java.util.concurrent.CompletableFuture<Object> future1 = new java.util.concurrent.CompletableFuture<>();
-		java.util.concurrent.CompletableFuture<Object> future2 = new java.util.concurrent.CompletableFuture<>();
-		java.util.concurrent.CompletableFuture<Object> future3 = new java.util.concurrent.CompletableFuture<>();
-		java.util.concurrent.CompletableFuture<Object> future4 = new java.util.concurrent.CompletableFuture<>();
-
-		// build the conjunct future
-		ConjunctFuture<?> result = futureFactory.createFuture(Arrays.asList(future1, future2, future3, future4));
-		assertEquals(4, result.getNumFuturesTotal());
-
-		java.util.concurrent.CompletableFuture<?> resultMapped = result.thenAccept(value -> {});
-
-		future1.complete(new Object());
-		future3.complete(new Object());
-		future4.complete(new Object());
-
-		future2.completeExceptionally(new IOException());
-
-		assertEquals(3, result.getNumFuturesCompleted());
-		assertTrue(result.isDone());
-		assertTrue(resultMapped.isDone());
-
-		try {
-			result.get();
-			fail();
-		} catch (ExecutionException e) {
-			assertTrue(e.getCause() instanceof IOException);
-		}
-
-		try {
-			resultMapped.get();
-			fail();
-		} catch (ExecutionException e) {
-			assertTrue(e.getCause() instanceof IOException);
+	public void testRetryCancellation() throws Exception {
+		final int retries = 10;
+		final AtomicInteger atomicInteger = new AtomicInteger(0);
+		final OneShotLatch notificationLatch = new OneShotLatch();
+		final OneShotLatch waitLatch = new OneShotLatch();
+		final AtomicReference<Throwable> atomicThrowable = new AtomicReference<>(null);
+
+		CompletableFuture<?> retryFuture = FutureUtils.retry(
+			() ->
+				CompletableFuture.supplyAsync(
+					() -> {
+						if (atomicInteger.incrementAndGet() == 2) {
+							notificationLatch.trigger();
+							try {
+								waitLatch.await();
+							} catch (InterruptedException e) {
+								atomicThrowable.compareAndSet(null, e);
+							}
+						}
+
+						throw new FlinkFutureException("Test exception");
+					},
+					TestingUtils.defaultExecutor()),
+			retries,
+			TestingUtils.defaultExecutor());
+
+		// await that we have failed once
+		notificationLatch.await();
+
+		assertFalse(retryFuture.isDone());
+
+		// cancel the retry future
+		retryFuture.cancel(false);
+
+		// let the retry operation continue
+		waitLatch.trigger();
+
+		assertTrue(retryFuture.isCancelled());
+		assertEquals(2, atomicInteger.get());
+
+		if (atomicThrowable.get() != null) {
+			throw new FlinkException("Exception occurred in the retry operation.", atomicThrowable.get());
 		}
 	}
 
 	/**
-	 * Tests that the conjunct future returns upon completion the collection of all future values
+	 * Tests that retry with delay fails after having exceeded all retries.
 	 */
-	@Test
-	public void testConjunctFutureValue() throws ExecutionException, InterruptedException {
-		java.util.concurrent.CompletableFuture<Integer> future1 = java.util.concurrent.CompletableFuture.completedFuture(1);
-		java.util.concurrent.CompletableFuture<Long> future2 = java.util.concurrent.CompletableFuture.completedFuture(2L);
-		java.util.concurrent.CompletableFuture<Double> future3 = new java.util.concurrent.CompletableFuture<>();
-
-		ConjunctFuture<Collection<Number>> result = FutureUtils.combineAll(Arrays.asList(future1, future2, future3));
-
-		assertFalse(result.isDone());
-
-		future3.complete(.1);
-
-		assertTrue(result.isDone());
+	@Test(expected = FutureUtils.RetryException.class)
+	public void testRetryWithDelayFailure() throws Throwable {
+		CompletableFuture<?> retryFuture = FutureUtils.retryWithDelay(
+			() -> FutureUtils.completedExceptionally(new FlinkException("Test exception")),
+			3,
+			Time.milliseconds(1L),
+			TestingUtils.defaultScheduledExecutor());
 
-		assertThat(result.get(), IsIterableContainingInAnyOrder.<Number>containsInAnyOrder(1, 2L, .1));
+		try {
+			retryFuture.get(TestingUtils.TIMEOUT().toMilliseconds(), TimeUnit.MILLISECONDS);
+		} catch (ExecutionException ee) {
+			throw ExceptionUtils.stripExecutionException(ee);
+		}
 	}
 
+	/**
+	 * Tests that the delay is respected between subsequent retries of a retry future with retry delay.
+	 */
 	@Test
-	public void testConjunctOfNone() throws Exception {
-		final ConjunctFuture<?> result = futureFactory.createFuture(Collections.<java.util.concurrent.CompletableFuture<Object>>emptyList());
-
-		assertEquals(0, result.getNumFuturesTotal());
-		assertEquals(0, result.getNumFuturesCompleted());
-		assertTrue(result.isDone());
+	public void testRetryWithDelay() throws Exception {
+		final int retries = 4;
+		final Time delay = Time.milliseconds(50L);
+		final AtomicInteger countDown = new AtomicInteger(retries);
+
+		CompletableFuture<Boolean> retryFuture = FutureUtils.retryWithDelay(
+			() -> {
+				if (countDown.getAndDecrement() == 0) {
+					return CompletableFuture.completedFuture(true);
+				} else {
+					return FutureUtils.completedExceptionally(new FlinkException("Test exception."));
+				}
+			},
+			retries,
+			delay,
+			TestingUtils.defaultScheduledExecutor());
+
+		long start = System.currentTimeMillis();
+
+		Boolean result = retryFuture.get();
+
+		long completionTime = System.currentTimeMillis() - start;
+
+		assertTrue(result);
+		assertTrue("The completion time should be at least rertries times delay between retries.", completionTime >= retries * delay.toMilliseconds());
 	}
 
 	/**
-	 * Factory to create {@link ConjunctFuture} for testing.
+	 * Tests that all scheduled tasks are canceled if the retry future is being cancelled.
 	 */
-	private interface FutureFactory {
-		ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures);
-	}
-
-	private static class ConjunctFutureFactory implements FutureFactory {
-
-		@Override
-		public ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
-			return FutureUtils.combineAll(futures);
-		}
-	}
-
-	private static class WaitingFutureFactory implements FutureFactory {
-
-		@Override
-		public ConjunctFuture<?> createFuture(Collection<? extends java.util.concurrent.CompletableFuture<?>> futures) {
-			return FutureUtils.waitForAll(futures);
-		}
+	@Test
+	public void testRetryWithDelayCancellation() {
+		ScheduledFuture<?> scheduledFutureMock = mock(ScheduledFuture.class);
+		ScheduledExecutor scheduledExecutorMock = mock(ScheduledExecutor.class);
+		doReturn(scheduledFutureMock).when(scheduledExecutorMock).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+		doAnswer(
+			(InvocationOnMock invocation) -> {
+				invocation.getArgumentAt(0, Runnable.class).run();
+				return null;
+			}).when(scheduledExecutorMock).execute(any(Runnable.class));
+
+		CompletableFuture<?> retryFuture = FutureUtils.retryWithDelay(
+			() -> FutureUtils.completedExceptionally(new FlinkException("Test exception")),
+			1,
+			TestingUtils.infiniteTime(),
+			scheduledExecutorMock);
+
+		assertFalse(retryFuture.isDone());
+
+		verify(scheduledExecutorMock).schedule(any(Runnable.class), anyLong(), any(TimeUnit.class));
+
+		retryFuture.cancel(false);
+
+		assertTrue(retryFuture.isCancelled());
+		verify(scheduledFutureMock).cancel(anyBoolean());
 	}
 }