You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/07/23 22:06:05 UTC

[08/11] flink git commit: [FLINK-9909][core] ConjunctFuture does not cancel input futures

[FLINK-9909][core] ConjunctFuture does not cancel input futures

If a ConjunctFuture is cancelled, then it won't cancel all of its input
futures automatically. If the users needs this behaviour then he has to
implement it explicitly. The reason for this change is that an implicit
cancellation can have unwanted side effects, because all of the cancelled
input futures' producers won't be executed.

This closes #6384.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9afda733
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9afda733
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9afda733

Branch: refs/heads/master
Commit: 9afda733a90a72be75ced9567452c6a7a5e3dc8c
Parents: c897471
Author: Till Rohrmann <tr...@apache.org>
Authored: Sun Jul 22 20:17:11 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Tue Jul 24 00:05:39 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/concurrent/FutureUtils.java   | 40 --------------------
 .../runtime/concurrent/FutureUtilsTest.java     | 34 -----------------
 2 files changed, 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9afda733/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 1cffaab..3a7e800 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -508,22 +508,6 @@ public class FutureUtils {
 		 * @return The number of Futures in the conjunction that are already complete
 		 */
 		public abstract int getNumFuturesCompleted();
-
-		/**
-		 * Gets the individual futures which make up the {@link ConjunctFuture}.
-		 *
-		 * @return Collection of futures which make up the {@link ConjunctFuture}
-		 */
-		protected abstract Collection<? extends CompletableFuture<?>> getConjunctFutures();
-
-		@Override
-		public boolean cancel(boolean mayInterruptIfRunning) {
-			for (CompletableFuture<?> completableFuture : getConjunctFutures()) {
-				completableFuture.cancel(mayInterruptIfRunning);
-			}
-
-			return super.cancel(mayInterruptIfRunning);
-		}
 	}
 
 	/**
@@ -531,8 +515,6 @@ public class FutureUtils {
 	 */
 	private static class ResultConjunctFuture<T> extends ConjunctFuture<Collection<T>> {
 
-		private final Collection<? extends CompletableFuture<? extends T>> resultFutures;
-
 		/** The total number of futures in the conjunction. */
 		private final int numTotal;
 
@@ -564,7 +546,6 @@ public class FutureUtils {
 
 		@SuppressWarnings("unchecked")
 		ResultConjunctFuture(Collection<? extends CompletableFuture<? extends T>> resultFutures) {
-			this.resultFutures = checkNotNull(resultFutures);
 			this.numTotal = resultFutures.size();
 			results = (T[]) new Object[numTotal];
 
@@ -587,11 +568,6 @@ public class FutureUtils {
 		public int getNumFuturesCompleted() {
 			return numCompleted.get();
 		}
-
-		@Override
-		protected Collection<? extends CompletableFuture<?>> getConjunctFutures() {
-			return resultFutures;
-		}
 	}
 
 	/**
@@ -600,8 +576,6 @@ public class FutureUtils {
 	 */
 	private static final class WaitingConjunctFuture extends ConjunctFuture<Void> {
 
-		private final Collection<? extends CompletableFuture<?>> futures;
-
 		/** Number of completed futures. */
 		private final AtomicInteger numCompleted = new AtomicInteger(0);
 
@@ -620,7 +594,6 @@ public class FutureUtils {
 		}
 
 		private WaitingConjunctFuture(Collection<? extends CompletableFuture<?>> futures) {
-			this.futures = checkNotNull(futures);
 			this.numTotal = futures.size();
 
 			if (futures.isEmpty()) {
@@ -641,11 +614,6 @@ public class FutureUtils {
 		public int getNumFuturesCompleted() {
 			return numCompleted.get();
 		}
-
-		@Override
-		protected Collection<? extends CompletableFuture<?>> getConjunctFutures() {
-			return futures;
-		}
 	}
 
 	/**
@@ -673,14 +641,11 @@ public class FutureUtils {
 
 		private final int numFuturesTotal;
 
-		private final Collection<? extends CompletableFuture<?>> futuresToComplete;
-
 		private int futuresCompleted;
 
 		private Throwable globalThrowable;
 
 		private CompletionConjunctFuture(Collection<? extends CompletableFuture<?>> futuresToComplete) {
-			this.futuresToComplete = checkNotNull(futuresToComplete);
 			numFuturesTotal = futuresToComplete.size();
 
 			futuresCompleted = 0;
@@ -725,11 +690,6 @@ public class FutureUtils {
 				return futuresCompleted;
 			}
 		}
-
-		@Override
-		protected Collection<? extends CompletableFuture<?>> getConjunctFutures() {
-			return futuresToComplete;
-		}
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9afda733/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
index 07bc4c1..1639c91 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/concurrent/FutureUtilsTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
@@ -42,7 +41,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.hamcrest.CoreMatchers.containsString;
@@ -548,38 +546,6 @@ public class FutureUtilsTest extends TestLogger {
 	}
 
 	@Test
-	public void testCancelWaitingConjunctFuture() {
-		cancelConjunctFuture(inputFutures -> FutureUtils.waitForAll(inputFutures));
-	}
-
-	@Test
-	public void testCancelResultConjunctFuture() {
-		cancelConjunctFuture(inputFutures -> FutureUtils.combineAll(inputFutures));
-	}
-
-	@Test
-	public void testCancelCompleteConjunctFuture() {
-		cancelConjunctFuture(inputFutures -> FutureUtils.completeAll(inputFutures));
-	}
-
-	private void cancelConjunctFuture(Function<Collection<? extends CompletableFuture<?>>, FutureUtils.ConjunctFuture<?>> conjunctFutureFactory) {
-		final int numInputFutures = 10;
-		final Collection<CompletableFuture<Void>> inputFutures = new ArrayList<>(numInputFutures);
-
-		for (int i = 0; i < numInputFutures; i++) {
-			inputFutures.add(new CompletableFuture<>());
-		}
-
-		final FutureUtils.ConjunctFuture<?> conjunctFuture = conjunctFutureFactory.apply(inputFutures);
-
-		conjunctFuture.cancel(false);
-
-		for (CompletableFuture<Void> inputFuture : inputFutures) {
-			assertThat(inputFuture.isCancelled(), is(true));
-		}
-	}
-
-	@Test
 	public void testSupplyAsyncFailure() throws Exception {
 		final String exceptionMessage = "Test exception";
 		final FlinkException testException = new FlinkException(exceptionMessage);