You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2020/09/16 19:22:41 UTC
[flink] branch release-1.10 updated: [FLINK-15467][task] Wait for
sourceTaskThread to finish before exiting from invoke.
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new a20fb4a [FLINK-15467][task] Wait for sourceTaskThread to finish before exiting from invoke.
a20fb4a is described below
commit a20fb4a796d8df91e4ebbcdc0b35cdd75145c574
Author: Roman Khachatryan <kh...@gmail.com>
AuthorDate: Mon Jul 27 23:07:44 2020 +0200
[FLINK-15467][task] Wait for sourceTaskThread to finish before exiting from invoke.
This closes #13383.
---
.../streaming/runtime/tasks/SourceStreamTask.java | 18 +++++++-
.../flink/streaming/runtime/tasks/StreamTask.java | 29 ++++++++++++-
.../runtime/tasks/SourceStreamTaskTest.java | 49 ++++++++++++++++++++++
.../runtime/tasks/StreamTaskTestHarness.java | 14 +++++--
4 files changed, 103 insertions(+), 7 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 6308c63..cb20cde 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -141,7 +141,12 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
}
}
finally {
- sourceThread.interrupt();
+ if (sourceThread.isAlive()) {
+ sourceThread.interrupt();
+ } else if (!sourceThread.getCompletionFuture().isDone()) {
+ // source thread didn't start
+ sourceThread.getCompletionFuture().complete(null);
+ }
}
}
@@ -151,6 +156,11 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
cancelTask();
}
+ @Override
+ protected CompletableFuture<Void> getCompletionFuture() {
+ return sourceThread.getCompletionFuture();
+ }
+
// ------------------------------------------------------------------------
// Checkpointing
// ------------------------------------------------------------------------
@@ -209,8 +219,12 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
setName("Legacy Source Thread - " + taskDescription);
}
+ /**
+ * @return future that is completed once this thread completes. If this task {@link #isFailing()} and this thread
+ * is not alive (e.g. not started) returns a normally completed future.
+ */
CompletableFuture<Void> getCompletionFuture() {
- return completionFuture;
+ return isFailing() && !isAlive() ? CompletableFuture.completedFuture(null) : completionFuture;
}
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8e932bf..f276de6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
@@ -84,12 +85,14 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.Closeable;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -207,6 +210,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
/** Flag to mark this task as canceled. */
private volatile boolean canceled;
+ /** Flag to mark this task as failing, i.e. if an exception has occurred inside {@link #invoke()}. */
+ private volatile boolean failing;
+
private boolean disposedOperators;
/** Thread pool for async snapshot workers. */
@@ -478,6 +484,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
afterInvoke();
}
finally {
+ failing = !canceled;
cleanUpInvoke();
}
}
@@ -488,6 +495,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
private void afterInvoke() throws Exception {
LOG.debug("Finished task {}", getName());
+ getCompletionFuture().exceptionally(unused -> null).join();
// make sure no further checkpoint and notification actions happen.
// we make sure that no other thread is currently in the locked scope before
@@ -526,6 +534,7 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
private void cleanUpInvoke() throws Exception {
+ getCompletionFuture().exceptionally(unused -> null).join();
// clean up everything we initialized
isRunning = false;
@@ -575,6 +584,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
mailboxProcessor.close();
}
+ protected CompletableFuture<Void> getCompletionFuture() {
+ return FutureUtils.completedVoidFuture();
+ }
+
@Override
public final void cancel() throws Exception {
isRunning = false;
@@ -586,8 +599,16 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
cancelTask();
}
finally {
- mailboxProcessor.allActionsCompleted();
- cancelables.close();
+ getCompletionFuture()
+ .whenComplete((unusedResult, unusedError) -> {
+ // WARN: the method is called from the task thread but the callback can be invoked from a different thread
+ mailboxProcessor.allActionsCompleted();
+ try {
+ cancelables.close();
+ } catch (IOException e) {
+ throw new CompletionException(e);
+ }
+ });
}
}
@@ -628,6 +649,10 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>>
}
}
+ public final boolean isFailing() {
+ return failing;
+ }
+
private void shutdownAsyncThreads() throws Exception {
if (!asyncOperationsThreadPool.isShutdown()) {
asyncOperationsThreadPool.shutdownNow();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 4239b57..c1f57ca 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -464,6 +464,24 @@ public class SourceStreamTaskTest {
testHarness.waitForTaskCompletion();
}
+ @Test
+ public void testWaitsForSourceThreadOnCancel() throws Exception {
+ StreamTaskTestHarness<String> harness = new StreamTaskTestHarness<>(SourceStreamTask::new, BasicTypeInfo.STRING_TYPE_INFO);
+
+ harness.setupOutputForSingletonOperatorChain();
+ harness.getStreamConfig().setStreamOperator(new StreamSource<>(new NonStoppingSource()));
+
+ harness.invoke();
+ NonStoppingSource.waitForStart();
+
+ harness.getTask().cancel();
+ harness.waitForTaskCompletion(500, true); // allow task to exit prematurely
+ assertTrue(harness.taskThread.isAlive());
+
+ NonStoppingSource.forceCancel();
+ harness.waitForTaskCompletion(Long.MAX_VALUE, true);
+ }
+
private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> {
private static final long serialVersionUID = 1;
@@ -570,6 +588,37 @@ public class SourceStreamTaskTest {
}
}
+ private static class NonStoppingSource implements SourceFunction<String> {
+ private static final long serialVersionUID = 1L;
+ private static boolean running = true;
+ private static CompletableFuture<Void> startFuture = new CompletableFuture<>();
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ startFuture.complete(null);
+ while (running) {
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ @Override
+ public void cancel() {
+ // do nothing - ignore usual cancellation
+ }
+
+ static void forceCancel() {
+ running = false;
+ }
+
+ static void waitForStart() {
+ startFuture.join();
+ }
+ }
+
private static class OpenCloseTestSource extends RichSourceFunction<String> {
private static final long serialVersionUID = 1L;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 79fb67e..c91334a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
@@ -48,6 +49,7 @@ import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
@@ -261,6 +263,10 @@ public class StreamTaskTestHarness<OUT> {
waitForTaskCompletion(Long.MAX_VALUE);
}
+ public void waitForTaskCompletion(long timeout) throws Exception {
+ waitForTaskCompletion(timeout, false);
+ }
+
/**
* Waits for the task completion. If this does not happen within the timeout, then a
* TimeoutException is thrown.
@@ -268,12 +274,14 @@ public class StreamTaskTestHarness<OUT> {
* @param timeout Timeout for the task completion
* @throws Exception
*/
- public void waitForTaskCompletion(long timeout) throws Exception {
+ public void waitForTaskCompletion(long timeout, boolean ignoreCancellationException) throws Exception {
Preconditions.checkState(taskThread != null, "Task thread was not started.");
taskThread.join(timeout);
if (taskThread.getError() != null) {
- throw new Exception("error in task", taskThread.getError());
+ if (!ignoreCancellationException || !ExceptionUtils.findThrowable(taskThread.getError(), CancelTaskException.class).isPresent()) {
+ throw new Exception("error in task", taskThread.getError());
+ }
}
}
@@ -432,7 +440,7 @@ public class StreamTaskTestHarness<OUT> {
// ------------------------------------------------------------------------
- private class TaskThread extends Thread {
+ class TaskThread extends Thread {
private final Supplier<? extends StreamTask<OUT, ?>> taskFactory;
private volatile StreamTask<OUT, ?> task;