You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/06/15 16:27:59 UTC
[flink] 02/02: [FLINK-18039] Ensure the source events are sent via
the coordinator thread.
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e01cab2f713802f6fb92f7472a258c07f2c18af7
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Thu Jun 11 17:30:28 2020 +0800
[FLINK-18039] Ensure the source events are sent via the coordinator thread.
---
.../coordinator/SourceCoordinatorContext.java | 87 ++++++++++++++--------
1 file changed, 54 insertions(+), 33 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 322778a..b6ceda9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -128,13 +128,15 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
@Override
public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
- try {
- operatorCoordinatorContext.sendEvent(new SourceEventWrapper(event), subtaskId);
- } catch (TaskNotRunningException e) {
- throw new FlinkRuntimeException(String.format("Failed to send event %s to subtask %d",
- event,
- subtaskId), e);
- }
+ callInCoordinatorThread(() -> {
+ try {
+ operatorCoordinatorContext.sendEvent(new SourceEventWrapper(event), subtaskId);
+ return null;
+ } catch (TaskNotRunningException e) {
+ throw new FlinkRuntimeException(
+ String.format("Failed to send event %s to subtask %d", event, subtaskId), e);
+ }
+ }, String.format("Failed to send event %s to subtask %d", event, subtaskId));
}
@Override
@@ -150,34 +152,28 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
@Override
public void assignSplits(SplitsAssignment<SplitT> assignment) {
// Ensure the split assignment is done by the the coordinator executor.
- if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
- try {
- coordinatorExecutor.submit(() -> assignSplits(assignment)).get();
- return;
- } catch (InterruptedException | ExecutionException e) {
- throw new FlinkRuntimeException("Failed to assign splits due to", e);
+ callInCoordinatorThread(() -> {
+ // Ensure all the subtasks in the assignment have registered.
+ for (Integer subtaskId : assignment.assignment().keySet()) {
+ if (!registeredReaders.containsKey(subtaskId)) {
+ throw new IllegalArgumentException(String.format(
+ "Cannot assign splits %s to subtask %d because the subtask is not registered.",
+ registeredReaders.get(subtaskId), subtaskId));
+ }
}
- }
-
- // Ensure all the subtasks in the assignment have registered.
- for (Integer subtaskId : assignment.assignment().keySet()) {
- if (!registeredReaders.containsKey(subtaskId)) {
- throw new IllegalArgumentException(String.format(
- "Cannot assign splits %s to subtask %d because the subtask is not registered.",
- registeredReaders.get(subtaskId), subtaskId));
- }
- }
- assignmentTracker.recordSplitAssignment(assignment);
- assignment.assignment().forEach(
- (id, splits) -> {
- try {
- operatorCoordinatorContext.sendEvent(new AddSplitEvent<>(splits), id);
- } catch (TaskNotRunningException e) {
- throw new FlinkRuntimeException(String.format(
- "Failed to assign splits %s to reader %d.", splits, id), e);
- }
- });
+ assignmentTracker.recordSplitAssignment(assignment);
+ assignment.assignment().forEach(
+ (id, splits) -> {
+ try {
+ operatorCoordinatorContext.sendEvent(new AddSplitEvent<>(splits), id);
+ } catch (TaskNotRunningException e) {
+ throw new FlinkRuntimeException(String.format(
+ "Failed to assign splits %s to reader %d.", splits, id), e);
+ }
+ });
+ return null;
+ }, String.format("Failed to assign splits %s due to ", assignment));
}
@Override
@@ -280,4 +276,29 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
void onCheckpointComplete(long checkpointId) {
assignmentTracker.onCheckpointComplete(checkpointId);
}
+
+ // ---------------- private helper methods -----------------
+
+ /**
+ * A helper method that delegates the callable to the coordinator thread if the
+ * current thread is not the coordinator thread, otherwise call the callable right away.
+ *
+ * @param callable the callable to delegate.
+ */
+ private <V> V callInCoordinatorThread(Callable<V> callable, String errorMessage) {
+ // Ensure the split assignment is done by the the coordinator executor.
+ if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+ try {
+ return coordinatorExecutor.submit(callable).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new FlinkRuntimeException(errorMessage, e);
+ }
+ }
+
+ try {
+ return callable.call();
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(errorMessage, e);
+ }
+ }
}