You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2020/07/29 14:23:34 UTC
[flink] branch release-1.11 updated: [FLINK-15467][task] Wait for
sourceTaskThread to finish before exiting from invoke
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 1dee87a [FLINK-15467][task] Wait for sourceTaskThread to finish before exiting from invoke
1dee87a is described below
commit 1dee87ae58cae3d11570e3b6bb9f45aa893a0c69
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Jul 27 23:07:44 2020 +0200
[FLINK-15467][task] Wait for sourceTaskThread to finish before exiting from invoke
---
.../streaming/runtime/tasks/SourceStreamTask.java | 18 +++++++-
.../flink/streaming/runtime/tasks/StreamTask.java | 27 +++++++++++-
.../runtime/tasks/SourceStreamTaskTest.java | 49 ++++++++++++++++++++++
.../runtime/tasks/StreamTaskTestHarness.java | 14 +++++--
4 files changed, 101 insertions(+), 7 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 405aff7..c28eee5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -152,7 +152,12 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
}
}
finally {
- sourceThread.interrupt();
+ if (sourceThread.isAlive()) {
+ sourceThread.interrupt();
+ } else if (!sourceThread.getCompletionFuture().isDone()) {
+ // source thread didn't start
+ sourceThread.getCompletionFuture().complete(null);
+ }
}
}
@@ -162,6 +167,11 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
cancelTask();
}
+ @Override
+ protected CompletableFuture<Void> getCompletionFuture() {
+ return sourceThread.getCompletionFuture();
+ }
+
// ------------------------------------------------------------------------
// Checkpointing
// ------------------------------------------------------------------------
@@ -212,8 +222,12 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
setName("Legacy Source Thread - " + taskDescription);
}
+ /**
+ * @return future that is completed once this thread completes. If this task {@link #isFailing()} and this thread
+ * is not alive (e.g. not started) returns a normally completed future.
+ */
CompletableFuture<Void> getCompletionFuture() {
- return completionFuture;
+ return isFailing() && !isAlive() ? CompletableFuture.completedFuture(null) : completionFuture;
}
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a90c5f2..1246b21 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -90,6 +90,7 @@ import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -199,6 +200,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
/** Flag to mark this task as canceled. */
private volatile boolean canceled;
+ /** Flag to mark this task as failing, i.e. if an exception has occurred inside {@link #invoke()}. */
+ private volatile boolean failing;
+
private boolean disposedOperators;
/** Thread pool for async snapshot workers. */
@@ -540,6 +544,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
afterInvoke();
}
catch (Exception invokeException) {
+ failing = !canceled;
try {
cleanUpInvoke();
}
@@ -562,6 +567,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
protected void afterInvoke() throws Exception {
LOG.debug("Finished task {}", getName());
+ getCompletionFuture().exceptionally(unused -> null).join();
final CompletableFuture<Void> timersFinishedFuture = new CompletableFuture<>();
@@ -600,6 +606,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
protected void cleanUpInvoke() throws Exception {
+ getCompletionFuture().exceptionally(unused -> null).join();
// clean up everything we initialized
isRunning = false;
@@ -655,6 +662,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
mailboxProcessor.close();
}
+ protected CompletableFuture<Void> getCompletionFuture() {
+ return FutureUtils.completedVoidFuture();
+ }
+
@Override
public final void cancel() throws Exception {
isRunning = false;
@@ -666,8 +677,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
cancelTask();
}
finally {
- mailboxProcessor.allActionsCompleted();
- cancelables.close();
+ getCompletionFuture()
+ .whenComplete((unusedResult, unusedError) -> {
+ // WARN: the method is called from the task thread but the callback can be invoked from a different thread
+ mailboxProcessor.allActionsCompleted();
+ try {
+ cancelables.close();
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ });
}
}
@@ -683,6 +702,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
return canceled;
}
+ public final boolean isFailing() {
+ return failing;
+ }
+
private void shutdownAsyncThreads() throws Exception {
if (!asyncOperationsThreadPool.isShutdown()) {
asyncOperationsThreadPool.shutdownNow();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 27c2576..deb5a62 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -506,6 +506,24 @@ public class SourceStreamTaskTest {
testHarness.waitForTaskCompletion();
}
+ @Test
+ public void testWaitsForSourceThreadOnCancel() throws Exception {
+ StreamTaskTestHarness<String> harness = new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+
+ harness.setupOutputForSingletonOperatorChain();
+ harness.getStreamConfig().setStreamOperator(new StreamSource<>(new NonStoppingSource()));
+
+ harness.invoke();
+ NonStoppingSource.waitForStart();
+
+ harness.getTask().cancel();
+ harness.waitForTaskCompletion(500, true); // allow task to exit prematurely
+ assertTrue(harness.taskThread.isAlive());
+
+ NonStoppingSource.forceCancel();
+ harness.waitForTaskCompletion(Long.MAX_VALUE, true);
+ }
+
private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> {
private static final long serialVersionUID = 1;
@@ -612,6 +630,37 @@ public class SourceStreamTaskTest {
}
}
+ private static class NonStoppingSource implements SourceFunction<String> {
+ private static final long serialVersionUID = 1L;
+ private static boolean running = true;
+ private static CompletableFuture<Void> startFuture = new CompletableFuture<>();
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ startFuture.complete(null);
+ while (running) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // do nothing - ignore usual cancellation
+ }
+
+ static void forceCancel() {
+ running = false;
+ }
+
+ static void waitForStart() {
+ startFuture.join();
+ }
+ }
+
private static class OpenCloseTestSource extends RichSourceFunction<String> {
private static final long serialVersionUID = 1L;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 3d3e034..21c98eb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -26,6 +26,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -53,6 +54,7 @@ import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.SupplierWithException;
@@ -273,18 +275,24 @@ public class StreamTaskTestHarness<OUT> {
waitForTaskCompletion(Long.MAX_VALUE);
}
+ public void waitForTaskCompletion(long timeout) throws Exception {
+ waitForTaskCompletion(timeout, false);
+ }
+
/**
* Waits for the task completion. If this does not happen within the timeout, then a
* TimeoutException is thrown.
*
* @param timeout Timeout for the task completion
*/
- public void waitForTaskCompletion(long timeout) throws Exception {
+ public void waitForTaskCompletion(long timeout, boolean ignoreCancellationException) throws Exception {
Preconditions.checkState(taskThread != null, "Task thread was not started.");
taskThread.join(timeout);
if (taskThread.getError() != null) {
- throw new Exception("error in task", taskThread.getError());
+ if (!ignoreCancellationException || !ExceptionUtils.findThrowable(taskThread.getError(), CancelTaskException.class).isPresent()) {
+ throw new Exception("error in task", taskThread.getError());
+ }
}
}
@@ -439,7 +447,7 @@ public class StreamTaskTestHarness<OUT> {
// ------------------------------------------------------------------------
- private class TaskThread extends Thread {
+ class TaskThread extends Thread {
private final SupplierWithException<? extends StreamTask<OUT, ?>, Exception> taskFactory;
private volatile StreamTask<OUT, ?> task;