You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2022/03/20 02:04:32 UTC
[flink] branch release-1.15 updated: [FLINK-26723][runtime]fix the error message thrown by SourceCoordinatorContext
This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new e3992ab [FLINK-26723][runtime]fix the error message thrown by SourceCoordinatorContext
e3992ab is described below
commit e3992ab17a4515076c287b8d12003e1449718df1
Author: zoucao <zh...@hotmail.com>
AuthorDate: Sat Mar 19 00:45:19 2022 +0800
[FLINK-26723][runtime]fix the error message thrown by SourceCoordinatorContext
---
.../source/coordinator/SourceCoordinatorContext.java | 19 +++++++++++--------
.../coordinator/SourceCoordinatorContextTest.java | 2 +-
2 files changed, 12 insertions(+), 9 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 5e560fa..4434a18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -195,14 +195,17 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
callInCoordinatorThread(
() -> {
// Ensure all the subtasks in the assignment have registered.
- for (Integer subtaskId : assignment.assignment().keySet()) {
- if (!registeredReaders.containsKey(subtaskId)) {
- throw new IllegalArgumentException(
- String.format(
- "Cannot assign splits %s to subtask %d because the subtask is not registered.",
- registeredReaders.get(subtaskId), subtaskId));
- }
- }
+ assignment
+ .assignment()
+ .forEach(
+ (id, splits) -> {
+ if (!registeredReaders.containsKey(id)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot assign splits %s to subtask %d because the subtask is not registered.",
+ splits, id));
+ }
+ });
assignmentTracker.recordSplitAssignment(assignment);
assignment
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index d15f28e..380e65c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -153,7 +153,7 @@ public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
}
},
"assignSplits() should fail to assign the splits to a reader that is not registered.",
- "Cannot assign splits");
+ "Cannot assign splits " + splitsAssignment.assignment().get(0));
}
@Test