You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/07/29 14:23:34 UTC

[flink] branch release-1.11 updated: [FLINK-15467][task] Wait for sourceTaskThread to finish before exiting from invoke

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 1dee87a  [FLINK-15467][task] Wait for sourceTaskThread to finish before exiting from invoke
1dee87a is described below

commit 1dee87ae58cae3d11570e3b6bb9f45aa893a0c69
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Jul 27 23:07:44 2020 +0200

    [FLINK-15467][task] Wait for sourceTaskThread to finish before exiting from invoke
---
 .../streaming/runtime/tasks/SourceStreamTask.java  | 18 +++++++-
 .../flink/streaming/runtime/tasks/StreamTask.java  | 27 +++++++++++-
 .../runtime/tasks/SourceStreamTaskTest.java        | 49 ++++++++++++++++++++++
 .../runtime/tasks/StreamTaskTestHarness.java       | 14 +++++--
 4 files changed, 101 insertions(+), 7 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 405aff7..c28eee5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -152,7 +152,12 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 			}
 		}
 		finally {
-			sourceThread.interrupt();
+			if (sourceThread.isAlive()) {
+				sourceThread.interrupt();
+			} else if (!sourceThread.getCompletionFuture().isDone()) {
+				// source thread didn't start
+				sourceThread.getCompletionFuture().complete(null);
+			}
 		}
 	}
 
@@ -162,6 +167,11 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 		cancelTask();
 	}
 
+	@Override
+	protected CompletableFuture<Void> getCompletionFuture() {
+		return sourceThread.getCompletionFuture();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Checkpointing
 	// ------------------------------------------------------------------------
@@ -212,8 +222,12 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 			setName("Legacy Source Thread - " + taskDescription);
 		}
 
+		/**
+		 * @return future that is completed once this thread completes. If this task {@link #isFailing()} and this thread
+		 * is not alive (e.g. not started) returns a normally completed future.
+		 */
 		CompletableFuture<Void> getCompletionFuture() {
-			return completionFuture;
+			return isFailing() && !isAlive() ? CompletableFuture.completedFuture(null) : completionFuture;
 		}
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a90c5f2..1246b21 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -90,6 +90,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -199,6 +200,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	/** Flag to mark this task as canceled. */
 	private volatile boolean canceled;
 
+	/** Flag to mark this task as failing, i.e. if an exception has occurred inside {@link #invoke()}. */
+	private volatile boolean failing;
+
 	private boolean disposedOperators;
 
 	/** Thread pool for async snapshot workers. */
@@ -540,6 +544,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			afterInvoke();
 		}
 		catch (Exception invokeException) {
+			failing = !canceled;
 			try {
 				cleanUpInvoke();
 			}
@@ -562,6 +567,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	protected void afterInvoke() throws Exception {
 		LOG.debug("Finished task {}", getName());
+		getCompletionFuture().exceptionally(unused -> null).join();
 
 		final CompletableFuture<Void> timersFinishedFuture = new CompletableFuture<>();
 
@@ -600,6 +606,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	protected void cleanUpInvoke() throws Exception {
+		getCompletionFuture().exceptionally(unused -> null).join();
 		// clean up everything we initialized
 		isRunning = false;
 
@@ -655,6 +662,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		mailboxProcessor.close();
 	}
 
+	protected CompletableFuture<Void> getCompletionFuture() {
+		return FutureUtils.completedVoidFuture();
+	}
+
 	@Override
 	public final void cancel() throws Exception {
 		isRunning = false;
@@ -666,8 +677,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			cancelTask();
 		}
 		finally {
-			mailboxProcessor.allActionsCompleted();
-			cancelables.close();
+			getCompletionFuture()
+				.whenComplete((unusedResult, unusedError) -> {
+					// WARN: the method is called from the task thread but the callback can be invoked from a different thread
+					mailboxProcessor.allActionsCompleted();
+					try {
+						cancelables.close();
+					} catch (IOException e) {
+						throw new CompletionException(e);
+					}
+				});
 		}
 	}
 
@@ -683,6 +702,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		return canceled;
 	}
 
+	public final boolean isFailing() {
+		return failing;
+	}
+
 	private void shutdownAsyncThreads() throws Exception {
 		if (!asyncOperationsThreadPool.isShutdown()) {
 			asyncOperationsThreadPool.shutdownNow();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 27c2576..deb5a62 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -506,6 +506,24 @@ public class SourceStreamTaskTest {
 		testHarness.waitForTaskCompletion();
 	}
 
+	@Test
+	public void testWaitsForSourceThreadOnCancel() throws Exception {
+		StreamTaskTestHarness<String> harness = new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+
+		harness.setupOutputForSingletonOperatorChain();
+		harness.getStreamConfig().setStreamOperator(new StreamSource<>(new NonStoppingSource()));
+
+		harness.invoke();
+		NonStoppingSource.waitForStart();
+
+		harness.getTask().cancel();
+		harness.waitForTaskCompletion(500, true); // allow task to exit prematurely
+		assertTrue(harness.taskThread.isAlive());
+
+		NonStoppingSource.forceCancel();
+		harness.waitForTaskCompletion(Long.MAX_VALUE, true);
+	}
+
 	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> {
 		private static final long serialVersionUID = 1;
 
@@ -612,6 +630,37 @@ public class SourceStreamTaskTest {
 		}
 	}
 
+	private static class NonStoppingSource implements SourceFunction<String> {
+		private static final long serialVersionUID = 1L;
+		private static boolean running = true;
+		private static CompletableFuture<Void> startFuture = new CompletableFuture<>();
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			startFuture.complete(null);
+			while (running) {
+				try {
+					Thread.sleep(500);
+				} catch (InterruptedException e) {
+					// ignore
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			// do nothing - ignore usual cancellation
+		}
+
+		static void forceCancel() {
+			running = false;
+		}
+
+		static void waitForStart() {
+			startFuture.join();
+		}
+	}
+
 	private static class OpenCloseTestSource extends RichSourceFunction<String> {
 		private static final long serialVersionUID = 1L;
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 3d3e034..21c98eb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -53,6 +54,7 @@ import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.SupplierWithException;
@@ -273,18 +275,24 @@ public class StreamTaskTestHarness<OUT> {
 		waitForTaskCompletion(Long.MAX_VALUE);
 	}
 
+	public void waitForTaskCompletion(long timeout) throws Exception {
+		waitForTaskCompletion(timeout, false);
+	}
+
 	/**
 	 * Waits for the task completion. If this does not happen within the timeout, then a
 	 * TimeoutException is thrown.
 	 *
 	 * @param timeout Timeout for the task completion
 	 */
-	public void waitForTaskCompletion(long timeout) throws Exception {
+	public void waitForTaskCompletion(long timeout, boolean ignoreCancellationException) throws Exception {
 		Preconditions.checkState(taskThread != null, "Task thread was not started.");
 
 		taskThread.join(timeout);
 		if (taskThread.getError() != null) {
-			throw new Exception("error in task", taskThread.getError());
+			if (!ignoreCancellationException || !ExceptionUtils.findThrowable(taskThread.getError(), CancelTaskException.class).isPresent()) {
+				throw new Exception("error in task", taskThread.getError());
+			}
 		}
 	}
 
@@ -439,7 +447,7 @@ public class StreamTaskTestHarness<OUT> {
 
 	// ------------------------------------------------------------------------
 
-	private class TaskThread extends Thread {
+	class TaskThread extends Thread {
 
 		private final SupplierWithException<? extends StreamTask<OUT, ?>, Exception> taskFactory;
 		private volatile StreamTask<OUT, ?> task;