You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/12/08 09:33:16 UTC
[flink] 08/08: [FLINK-20081][connector/common][source] Fix the
executor notifier to let the handler run in main thread when handling
exception from the callable.
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 47c2d97499ed95a33c071e916eb8bde7b179e879
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Wed Nov 11 13:46:54 2020 +0800
[FLINK-20081][connector/common][source] Fix the executor notifier to let the handler run in main thread when handling exception from the callable.
This closes #14030
---
.../connector/source/SplitEnumeratorContext.java | 4 ++++
.../source/coordinator/ExecutorNotifier.java | 5 ++---
.../source/coordinator/ExecutorNotifierTest.java | 25 +++++++++++++++++++++-
.../coordinator/SourceCoordinatorContextTest.java | 3 ++-
.../coordinator/SourceCoordinatorTestBase.java | 3 ++-
5 files changed, 34 insertions(+), 6 deletions(-)
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index 8ec8618..c85700c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -92,6 +92,8 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> {
* the states that will be a part of the {@link SplitEnumerator#snapshotState()}. Otherwise the
* there might be unexpected behavior.
*
+ * <p>Note that an exception thrown from the handler would result in failing the job.
+ *
* @param callable a callable to call.
* @param handler a handler that handles the return value of or the exception thrown from the callable.
*/
@@ -106,6 +108,8 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> {
* the states that will be a part of the {@link SplitEnumerator#snapshotState()}. Otherwise the
* there might be unexpected behavior.
*
+ * <p>Note that an exception thrown from the handler would result in failing the job.
+ *
* @param callable the callable to call.
* @param handler a handler that handles the return value of or the exception thrown from the callable.
* @param initialDelay the initial delay of calling the callable.
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
index c865df5..35b9a33 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java
@@ -83,8 +83,7 @@ public class ExecutorNotifier implements AutoCloseable {
T result = callable.call();
executorToNotify.execute(() -> handler.accept(result, null));
} catch (Throwable t) {
- LOG.error("Unexpected exception {}", t);
- handler.accept(null, t);
+ executorToNotify.execute(() -> handler.accept(null, t));
}
});
}
@@ -133,7 +132,7 @@ public class ExecutorNotifier implements AutoCloseable {
T result = callable.call();
executorToNotify.execute(() -> handler.accept(result, null));
} catch (Throwable t) {
- handler.accept(null, t);
+ executorToNotify.execute(() -> handler.accept(null, t));
}
}, initialDelayMs, periodMs, TimeUnit.MILLISECONDS);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
index bbd9a7d..f5f179a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifierTest.java
@@ -94,7 +94,30 @@ public class ExecutorNotifierTest {
}
@Test
- public void testExceptionInHandler() throws InterruptedException {
+ public void testExceptionInHandlerWhenHandlingException() throws InterruptedException {
+ Exception exception1 = new Exception("Expected exception.");
+ RuntimeException exception2 = new RuntimeException("Expected exception.");
+ CountDownLatch latch = new CountDownLatch(1);
+ notifier.notifyReadyAsync(
+ () -> {
+ throw exception1;
+ },
+ (v, e) -> {
+ assertEquals(exception1, e);
+ assertNull(v);
+ latch.countDown();
+ throw exception2;
+ });
+ latch.await();
+ closeExecutorToNotify();
+ // The uncaught exception handler may fire after the executor has shutdown.
+ // We need to wait on the countdown latch here.
+ exceptionInHandlerLatch.await(10000L, TimeUnit.MILLISECONDS);
+ assertEquals(exception2, exceptionInHandler);
+ }
+
+ @Test
+ public void testExceptionInHandlerWhenHandlingResult() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
RuntimeException exception = new RuntimeException("Expected exception.");
notifier.notifyReadyAsync(() -> 1234, (v, e) -> {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index 2441192..280f6a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -144,7 +144,8 @@ public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
SourceCoordinatorContext<MockSourceSplit> restoredContext;
SplitAssignmentTracker<MockSourceSplit> restoredTracker = new SplitAssignmentTracker<>();
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory =
- new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(TEST_OPERATOR_ID.toHexString());
+ new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
+ TEST_OPERATOR_ID.toHexString(), operatorCoordinatorContext);
try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
DataInputStream in = new DataInputStream(bais)) {
restoredContext = new SourceCoordinatorContext<>(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
index a46b1ad..22fb4a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTestBase.java
@@ -57,7 +57,8 @@ public abstract class SourceCoordinatorTestBase {
splitSplitAssignmentTracker = new SplitAssignmentTracker<>();
String coordinatorThreadName = TEST_OPERATOR_ID.toHexString();
SourceCoordinatorProvider.CoordinatorExecutorThreadFactory coordinatorThreadFactory =
- new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(coordinatorThreadName);
+ new SourceCoordinatorProvider.CoordinatorExecutorThreadFactory(
+ coordinatorThreadName, operatorCoordinatorContext);
coordinatorExecutor = Executors.newSingleThreadExecutor(coordinatorThreadFactory);
context = new SourceCoordinatorContext<>(
coordinatorExecutor,