You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/12/08 09:33:15 UTC
[flink] 07/08: [FLINK-20081][connector/common][source] The
SourceCoordinator should fail the job instead of killing JM when it catches
an unhandled exception.
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 054e27cc8ed7364b5637fc8debf76236fb3d9263
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Wed Nov 11 13:45:01 2020 +0800
[FLINK-20081][connector/common][source] The SourceCoordinator should fail the job instead of killing JM when it catches an unhandled exception.
---
.../coordinator/SourceCoordinatorProvider.java | 11 +++++++----
.../coordinator/SourceCoordinatorProviderTest.java | 21 +++++++++++++++++++++
2 files changed, 28 insertions(+), 4 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 0304a88..4047bad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
-import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@@ -67,7 +66,7 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends Recre
public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception {
final String coordinatorThreadName = "SourceCoordinator-" + operatorName;
CoordinatorExecutorThreadFactory coordinatorThreadFactory =
- new CoordinatorExecutorThreadFactory(coordinatorThreadName);
+ new CoordinatorExecutorThreadFactory(coordinatorThreadName, context);
ExecutorService coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
SimpleVersionedSerializer<SplitT> splitSerializer = source.getSplitSerializer();
SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
@@ -81,10 +80,14 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends Recre
*/
public static class CoordinatorExecutorThreadFactory implements ThreadFactory {
private final String coordinatorThreadName;
+ private final OperatorCoordinator.Context context;
private Thread t;
- CoordinatorExecutorThreadFactory(String coordinatorThreadName) {
+ CoordinatorExecutorThreadFactory(
+ String coordinatorThreadName,
+ OperatorCoordinator.Context context) {
this.coordinatorThreadName = coordinatorThreadName;
+ this.context = context;
this.t = null;
}
@@ -95,7 +98,7 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends Recre
"SingleThreadExecutor.");
}
t = new Thread(r, coordinatorThreadName);
- t.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
+ t.setUncaughtExceptionHandler((thread, throwable) -> context.failJob(throwable));
return t;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
index 3e736bb..c0fc3a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.source.coordinator;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.junit.Before;
import org.junit.Test;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import static org.junit.Assert.assertEquals;
@@ -96,4 +98,23 @@ public class SourceCoordinatorProviderTest {
restoredSourceCoordinator.getContext().registeredReaders().get(0));
}
+ @Test
+ public void testCallAsyncExceptionFailsJob() throws Exception {
+ MockOperatorCoordinatorContext context =
+ new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SPLITS);
+ RecreateOnResetOperatorCoordinator coordinator =
+ (RecreateOnResetOperatorCoordinator) provider.create(context);
+ SourceCoordinator<?, ?> sourceCoordinator =
+ (SourceCoordinator<?, ?>) coordinator.getInternalCoordinator();
+ sourceCoordinator.getContext().callAsync(
+ () -> null,
+ (ignored, e) -> {
+ throw new RuntimeException();
+ });
+ CommonTestUtils.waitUtil(
+ context::isJobFailed,
+ Duration.ofSeconds(10L),
+ "The job did not fail before timeout.");
+ }
+
}