You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2020/06/15 16:27:59 UTC

[flink] 02/02: [FLINK-18039] Ensure the source events are sent via the coordinator thread.

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e01cab2f713802f6fb92f7472a258c07f2c18af7
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Thu Jun 11 17:30:28 2020 +0800

    [FLINK-18039] Ensure the source events are sent via the coordinator thread.
---
 .../coordinator/SourceCoordinatorContext.java      | 87 ++++++++++++++--------
 1 file changed, 54 insertions(+), 33 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 322778a..b6ceda9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -128,13 +128,15 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
 
 	@Override
 	public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
-		try {
-			operatorCoordinatorContext.sendEvent(new SourceEventWrapper(event), subtaskId);
-		} catch (TaskNotRunningException e) {
-			throw new FlinkRuntimeException(String.format("Failed to send event %s to subtask %d",
-					event,
-					subtaskId), e);
-		}
+		callInCoordinatorThread(() -> {
+			try {
+				operatorCoordinatorContext.sendEvent(new SourceEventWrapper(event), subtaskId);
+				return null;
+			} catch (TaskNotRunningException e) {
+				throw new FlinkRuntimeException(
+						String.format("Failed to send event %s to subtask %d", event, subtaskId), e);
+			}
+		}, String.format("Failed to send event %s to subtask %d", event, subtaskId));
 	}
 
 	@Override
@@ -150,34 +152,28 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
 	@Override
 	public void assignSplits(SplitsAssignment<SplitT> assignment) {
 		// Ensure the split assignment is done by the the coordinator executor.
-		if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
-			try {
-				coordinatorExecutor.submit(() -> assignSplits(assignment)).get();
-				return;
-			} catch (InterruptedException | ExecutionException e) {
-				throw new FlinkRuntimeException("Failed to assign splits due to", e);
+		callInCoordinatorThread(() -> {
+			// Ensure all the subtasks in the assignment have registered.
+			for (Integer subtaskId : assignment.assignment().keySet()) {
+				if (!registeredReaders.containsKey(subtaskId)) {
+					throw new IllegalArgumentException(String.format(
+							"Cannot assign splits %s to subtask %d because the subtask is not registered.",
+							registeredReaders.get(subtaskId), subtaskId));
+				}
 			}
-		}
-
-		// Ensure all the subtasks in the assignment have registered.
-		for (Integer subtaskId : assignment.assignment().keySet()) {
-			if (!registeredReaders.containsKey(subtaskId)) {
-				throw new IllegalArgumentException(String.format(
-						"Cannot assign splits %s to subtask %d because the subtask is not registered.",
-						registeredReaders.get(subtaskId), subtaskId));
-			}
-		}
 
-		assignmentTracker.recordSplitAssignment(assignment);
-		assignment.assignment().forEach(
-				(id, splits) -> {
-					try {
-						operatorCoordinatorContext.sendEvent(new AddSplitEvent<>(splits), id);
-					} catch (TaskNotRunningException e) {
-						throw new FlinkRuntimeException(String.format(
-								"Failed to assign splits %s to reader %d.", splits, id), e);
-					}
-				});
+			assignmentTracker.recordSplitAssignment(assignment);
+			assignment.assignment().forEach(
+					(id, splits) -> {
+						try {
+							operatorCoordinatorContext.sendEvent(new AddSplitEvent<>(splits), id);
+						} catch (TaskNotRunningException e) {
+							throw new FlinkRuntimeException(String.format(
+									"Failed to assign splits %s to reader %d.", splits, id), e);
+						}
+					});
+			return null;
+		}, String.format("Failed to assign splits %s due to ", assignment));
 	}
 
 	@Override
@@ -280,4 +276,29 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
 	void onCheckpointComplete(long checkpointId) {
 		assignmentTracker.onCheckpointComplete(checkpointId);
 	}
+
+	// ---------------- private helper methods -----------------
+
+	/**
+	 * A helper method that delegates the callable to the coordinator thread if the
+	 * current thread is not the coordinator thread, otherwise call the callable right away.
+	 *
+	 * @param callable the callable to delegate.
+	 */
+	private <V> V callInCoordinatorThread(Callable<V> callable, String errorMessage) {
+		// Ensure the split assignment is done by the the coordinator executor.
+		if (!coordinatorThreadFactory.isCurrentThreadCoordinatorThread()) {
+			try {
+				return coordinatorExecutor.submit(callable).get();
+			} catch (InterruptedException | ExecutionException e) {
+				throw new FlinkRuntimeException(errorMessage, e);
+			}
+		}
+
+		try {
+			return callable.call();
+		} catch (Exception e) {
+			throw new FlinkRuntimeException(errorMessage, e);
+		}
+	}
 }