You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2020/09/16 19:22:41 UTC

[flink] branch release-1.10 updated: [FLINK-15467][task] Wait for sourceTaskThread to finish before exiting from invoke.

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new a20fb4a  [FLINK-15467][task] Wait for sourceTaskThread to finish before exiting from invoke.
a20fb4a is described below

commit a20fb4a796d8df91e4ebbcdc0b35cdd75145c574
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Jul 27 23:07:44 2020 +0200

    [FLINK-15467][task] Wait for sourceTaskThread to finish before exiting from invoke.
    
    This closes #13383.
---
 .../streaming/runtime/tasks/SourceStreamTask.java  | 18 +++++++-
 .../flink/streaming/runtime/tasks/StreamTask.java  | 29 ++++++++++++-
 .../runtime/tasks/SourceStreamTaskTest.java        | 49 ++++++++++++++++++++++
 .../runtime/tasks/StreamTaskTestHarness.java       | 14 +++++--
 4 files changed, 103 insertions(+), 7 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 6308c63..cb20cde 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -141,7 +141,12 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 			}
 		}
 		finally {
-			sourceThread.interrupt();
+			if (sourceThread.isAlive()) {
+				sourceThread.interrupt();
+			} else if (!sourceThread.getCompletionFuture().isDone()) {
+				// source thread didn't start
+				sourceThread.getCompletionFuture().complete(null);
+			}
 		}
 	}
 
@@ -151,6 +156,11 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 		cancelTask();
 	}
 
+	@Override
+	protected CompletableFuture<Void> getCompletionFuture() {
+		return sourceThread.getCompletionFuture();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Checkpointing
 	// ------------------------------------------------------------------------
@@ -209,8 +219,12 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 			setName("Legacy Source Thread - " + taskDescription);
 		}
 
+		/**
+		 * @return future that is completed once this thread completes. If this task {@link #isFailing()} and this thread
+		 * is not alive (e.g. not started) returns a normally completed future.
+		 */
 		CompletableFuture<Void> getCompletionFuture() {
-			return completionFuture;
+			return isFailing() && !isAlive() ? CompletableFuture.completedFuture(null) : completionFuture;
 		}
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8e932bf..f276de6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -84,12 +85,14 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.Closeable;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -207,6 +210,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	/** Flag to mark this task as canceled. */
 	private volatile boolean canceled;
 
+	/** Flag to mark this task as failing, i.e. if an exception has occurred inside {@link #invoke()}. */
+	private volatile boolean failing;
+
 	private boolean disposedOperators;
 
 	/** Thread pool for async snapshot workers. */
@@ -478,6 +484,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			afterInvoke();
 		}
 		finally {
+			failing = !canceled;
 			cleanUpInvoke();
 		}
 	}
@@ -488,6 +495,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 
 	private void afterInvoke() throws Exception {
 		LOG.debug("Finished task {}", getName());
+		getCompletionFuture().exceptionally(unused -> null).join();
 
 		// make sure no further checkpoint and notification actions happen.
 		// we make sure that no other thread is currently in the locked scope before
@@ -526,6 +534,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 	}
 
 	private void cleanUpInvoke() throws Exception {
+		getCompletionFuture().exceptionally(unused -> null).join();
 		// clean up everything we initialized
 		isRunning = false;
 
@@ -575,6 +584,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		mailboxProcessor.close();
 	}
 
+	protected CompletableFuture<Void> getCompletionFuture() {
+		return FutureUtils.completedVoidFuture();
+	}
+
 	@Override
 	public final void cancel() throws Exception {
 		isRunning = false;
@@ -586,8 +599,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 			cancelTask();
 		}
 		finally {
-			mailboxProcessor.allActionsCompleted();
-			cancelables.close();
+			getCompletionFuture()
+				.whenComplete((unusedResult, unusedError) -> {
+					// WARN: the method is called from the task thread but the callback can be invoked from a different thread
+					mailboxProcessor.allActionsCompleted();
+					try {
+						cancelables.close();
+					} catch (IOException e) {
+						throw new CompletionException(e);
+					}
+				});
 		}
 	}
 
@@ -628,6 +649,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
 		}
 	}
 
+	public final boolean isFailing() {
+		return failing;
+	}
+
 	private void shutdownAsyncThreads() throws Exception {
 		if (!asyncOperationsThreadPool.isShutdown()) {
 			asyncOperationsThreadPool.shutdownNow();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 4239b57..c1f57ca 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -464,6 +464,24 @@ public class SourceStreamTaskTest {
 		testHarness.waitForTaskCompletion();
 	}
 
+	@Test
+	public void testWaitsForSourceThreadOnCancel() throws Exception {
+		StreamTaskTestHarness<String> harness = new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+
+		harness.setupOutputForSingletonOperatorChain();
+		harness.getStreamConfig().setStreamOperator(new StreamSource<>(new NonStoppingSource()));
+
+		harness.invoke();
+		NonStoppingSource.waitForStart();
+
+		harness.getTask().cancel();
+		harness.waitForTaskCompletion(500, true); // allow task to exit prematurely
+		assertTrue(harness.taskThread.isAlive());
+
+		NonStoppingSource.forceCancel();
+		harness.waitForTaskCompletion(Long.MAX_VALUE, true);
+	}
+
 	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> {
 		private static final long serialVersionUID = 1;
 
@@ -570,6 +588,37 @@ public class SourceStreamTaskTest {
 		}
 	}
 
+	private static class NonStoppingSource implements SourceFunction<String> {
+		private static final long serialVersionUID = 1L;
+		private static boolean running = true;
+		private static CompletableFuture<Void> startFuture = new CompletableFuture<>();
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			startFuture.complete(null);
+			while (running) {
+				try {
+					Thread.sleep(500);
+				} catch (InterruptedException e) {
+					// ignore
+				}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			// do nothing - ignore usual cancellation
+		}
+
+		static void forceCancel() {
+			running = false;
+		}
+
+		static void waitForStart() {
+			startFuture.join();
+		}
+	}
+
 	private static class OpenCloseTestSource extends RichSourceFunction<String> {
 		private static final long serialVersionUID = 1L;
 
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 79fb67e..c91334a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
@@ -48,6 +49,7 @@ import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import org.junit.Assert;
@@ -261,6 +263,10 @@ public class StreamTaskTestHarness<OUT> {
 		waitForTaskCompletion(Long.MAX_VALUE);
 	}
 
+	public void waitForTaskCompletion(long timeout) throws Exception {
+		waitForTaskCompletion(timeout, false);
+	}
+
 	/**
 	 * Waits for the task completion. If this does not happen within the timeout, then a
 	 * TimeoutException is thrown.
@@ -268,12 +274,14 @@ public class StreamTaskTestHarness<OUT> {
 	 * @param timeout Timeout for the task completion
 	 * @throws Exception
 	 */
-	public void waitForTaskCompletion(long timeout) throws Exception {
+	public void waitForTaskCompletion(long timeout, boolean ignoreCancellationException) throws Exception {
 		Preconditions.checkState(taskThread != null, "Task thread was not started.");
 
 		taskThread.join(timeout);
 		if (taskThread.getError() != null) {
-			throw new Exception("error in task", taskThread.getError());
+			if (!ignoreCancellationException || !ExceptionUtils.findThrowable(taskThread.getError(), CancelTaskException.class).isPresent()) {
+				throw new Exception("error in task", taskThread.getError());
+			}
 		}
 	}
 
@@ -432,7 +440,7 @@ public class StreamTaskTestHarness<OUT> {
 
 	// ------------------------------------------------------------------------
 
-	private class TaskThread extends Thread {
+	class TaskThread extends Thread {
 
 		private final Supplier<? extends StreamTask<OUT, ?>> taskFactory;
 		private volatile StreamTask<OUT, ?> task;