You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/12/08 09:33:15 UTC

[flink] 07/08: [FLINK-20081][connector/common][source] The SourceCoordinator should fail the job instead of killing JM when it catches an unhandled exception.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 054e27cc8ed7364b5637fc8debf76236fb3d9263
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Wed Nov 11 13:45:01 2020 +0800

    [FLINK-20081][connector/common][source] The SourceCoordinator should fail the job instead of killing JM when it catches an unhandled exception.
---
 .../coordinator/SourceCoordinatorProvider.java      | 11 +++++++----
 .../coordinator/SourceCoordinatorProviderTest.java  | 21 +++++++++++++++++++++
 2 files changed, 28 insertions(+), 4 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
index 0304a88..4047bad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProvider.java
@@ -24,7 +24,6 @@ import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator;
-import org.apache.flink.runtime.util.FatalExitExceptionHandler;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -67,7 +66,7 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends Recre
 	public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) throws Exception  {
 		final String coordinatorThreadName = "SourceCoordinator-" + operatorName;
 		CoordinatorExecutorThreadFactory coordinatorThreadFactory =
-				new CoordinatorExecutorThreadFactory(coordinatorThreadName);
+				new CoordinatorExecutorThreadFactory(coordinatorThreadName, context);
 		ExecutorService coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
 		SimpleVersionedSerializer<SplitT> splitSerializer = source.getSplitSerializer();
 		SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
@@ -81,10 +80,14 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends Recre
 	 */
 	public static class CoordinatorExecutorThreadFactory implements ThreadFactory {
 		private final String coordinatorThreadName;
+		private final OperatorCoordinator.Context context;
 		private Thread t;
 
-		CoordinatorExecutorThreadFactory(String coordinatorThreadName) {
+		CoordinatorExecutorThreadFactory(
+				String coordinatorThreadName,
+				OperatorCoordinator.Context context) {
 			this.coordinatorThreadName = coordinatorThreadName;
+			this.context = context;
 			this.t = null;
 		}
 
@@ -95,7 +98,7 @@ public class SourceCoordinatorProvider<SplitT extends SourceSplit> extends Recre
 						"SingleThreadExecutor.");
 			}
 			t = new Thread(r, coordinatorThreadName);
-			t.setUncaughtExceptionHandler(FatalExitExceptionHandler.INSTANCE);
+			t.setUncaughtExceptionHandler((thread, throwable) -> context.failJob(throwable));
 			return t;
 		}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
index 3e736bb..c0fc3a8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorProviderTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.source.coordinator;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 
 import static org.junit.Assert.assertEquals;
@@ -96,4 +98,23 @@ public class SourceCoordinatorProviderTest {
 				restoredSourceCoordinator.getContext().registeredReaders().get(0));
 	}
 
+	@Test
+	public void testCallAsyncExceptionFailsJob() throws Exception {
+		MockOperatorCoordinatorContext context =
+			new MockOperatorCoordinatorContext(OPERATOR_ID, NUM_SPLITS);
+		RecreateOnResetOperatorCoordinator coordinator =
+			(RecreateOnResetOperatorCoordinator) provider.create(context);
+		SourceCoordinator<?, ?> sourceCoordinator =
+			(SourceCoordinator<?, ?>) coordinator.getInternalCoordinator();
+		sourceCoordinator.getContext().callAsync(
+			() -> null,
+			(ignored, e) -> {
+				throw new RuntimeException();
+			});
+		CommonTestUtils.waitUtil(
+			context::isJobFailed,
+			Duration.ofSeconds(10L),
+			"The job did not fail before timeout.");
+	}
+
 }