You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/12/08 09:33:16 UTC

[flink] 08/08: [FLINK-20081][connector/common][source] Fix the executor notifier to let the handler run in main thread when handling exception from the callable.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 47c2d97499ed95a33c071e916eb8bde7b179e879
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Wed Nov 11 13:46:54 2020 +0800

    [FLINK-20081][connector/common][source] Fix the executor notifier to let the handler run in main thread when handling exception from the callable.
    
    This closes #14030
---
 .../connector/source/SplitEnumeratorContext.java   |  4 ++++
 .../source/coordinator/ExecutorNotifier.java       |  5 ++---
 .../source/coordinator/ExecutorNotifierTest.java   | 25 +++++++++++++++++++++-
 .../coordinator/SourceCoordinatorContextTest.java  |  3 ++-
 .../coordinator/SourceCoordinatorTestBase.java     |  3 ++-
 5 files changed, 34 insertions(+), 6 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index 8ec8618..c85700c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -92,6 +92,8 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> {
 	 * the states that will be a part of the {@link SplitEnumerator#snapshotState()}. Otherwise the
 	 * there might be unexpected behavior.
 	 *
+	 * <p>Note that an exception thrown from the handler would result in failing the job.
+	 *
 	 * @param callable a callable to call.
 	 * @param handler a handler that handles the return value of or the exception thrown from the callable.
 	 */
@@ -106,6 +108,8 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> {
 	 * the states that will be a part of the {@link SplitEnumerator#snapshotState()}. Otherwise the
 	 * there might be unexpected behavior.
 	 *
+	 * <p>Note that an exception thrown from the handler would result in failing the job.
+	 *
 	 * @param callable the callable to call.
 	 * @param handler a handler that handles the return value of or the exception thrown from the callable.
 	 * @param initialDelay the initial delay of calling the callable.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
index c865df5..35b9a33 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
@@ -83,8 +83,7 @@ public class ExecutorNotifier implements AutoCloseable {
 				T result = callable.call();
 				executorToNotify.execute(() -> handler.accept(result, null));
 			} catch (Throwable t) {
-				LOG.error("Unexpected exception {}", t);
-				handler.accept(null, t);
+				executorToNotify.execute(() -> handler.accept(null, t));
 			}
 		});
 	}
@@ -133,7 +132,7 @@ public class ExecutorNotifier implements AutoCloseable {
 				T result = callable.call();
 				executorToNotify.execute(() -> handler.accept(result, null));
 			} catch (Throwable t) {
-				handler.accept(null, t);
+				executorToNotify.execute(() -> handler.accept(null, t));
 			}
 		}, initialDelayMs, periodMs, TimeUnit.MILLISECONDS);
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
index bbd9a7d..f5f179a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
@@ -94,7 +94,30 @@ public class ExecutorNotifierTest {
 	}
 
 	@Test
-	public void testExceptionInHandler() throws InterruptedException {
+	public void testExceptionInHandlerWhenHandlingException() throws InterruptedException {
+		Exception exception1 = new Exception("Expected exception.");
+		RuntimeException exception2 =  new RuntimeException("Expected exception.");
+		CountDownLatch latch = new CountDownLatch(1);
+		notifier.notifyReadyAsync(
+			() -> {
+				throw exception1;
+			},
+			(v, e) -> {
+				assertEquals(exception1, e);
+				assertNull(v);
+				latch.countDown();
+				throw exception2;
+			});
+		latch.await();
+		closeExecutorToNotify();
+		// The uncaught exception handler may fire after the executor has shutdown.
+		// We need to wait on the countdown latch here.
+		exceptionInHandlerLatch.await(10000L, TimeUnit.MILLISECONDS);
+		assertEquals(exception2, exceptionInHandler);
+	}
+
+	@Test
+	public void testExceptionInHandlerWhenHandlingResult() throws InterruptedException {
 		CountDownLatch latch = new CountDownLatch(1);
 		RuntimeException exception =  new RuntimeException("Expected exception.");
 		notifier.notifyReadyAsync(() -> 1234, (v, e) -> {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index 2441192..280f6a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -144,7 +144,8 @@ public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
 		SourceCoordinatorContext<MockSourceSplit> restoredContext;
 		SplitAssignmentTracker<MockSourceSplit> restoredTracker = new SplitAssignmentTracker<>();
 		SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory =
-				new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(TEST_OPERATOR_ID.toHexString());
+				new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
+					TEST_OPERATOR_ID.toHexString(), operatorCoordinatorContext);
 		try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
 				DataInputStream in = new DataInputStream(bais)) {
 			restoredContext = new SourceCoordinatorContext<>(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index a46b1ad..22fb4a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -57,7 +57,8 @@ public abstract class SourceCoordinatorTestBase {
 		splitSplitAssignmentTracker = new SplitAssignmentTracker<>();
 		String coordinatorThreadName = TEST_OPERATOR_ID.toHexString();
 		SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory =
-				new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(coordinatorThreadName);
+				new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
+					coordinatorThreadName, operatorCoordinatorContext);
 		coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
 		context = new SourceCoordinatorContext<>(
 				coordinatorExecutor,