You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by jq...@apache.org on 2022/03/20 02:04:32 UTC

[flink] branch release-1.15 updated: [FLINK-26723][runtime]fix the error message thrown by SourceCoordinatorContext

This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new e3992ab  [FLINK-26723][runtime]fix the error message thrown by SourceCoordinatorContext
e3992ab is described below

commit e3992ab17a4515076c287b8d12003e1449718df1
Author: zoucao <zh...@hotmail.com>
AuthorDate: Sat Mar 19 00:45:19 2022 +0800

    [FLINK-26723][runtime]fix the error message thrown by SourceCoordinatorContext
---
 .../source/coordinator/SourceCoordinatorContext.java  | 19 +++++++++++--------
 .../coordinator/SourceCoordinatorContextTest.java     |  2 +-
 2 files changed, 12 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 5e560fa..4434a18 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -195,14 +195,17 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit>
         callInCoordinatorThread(
                 () -> {
                     // Ensure all the subtasks in the assignment have registered.
-                    for (Integer subtaskId : assignment.assignment().keySet()) {
-                        if (!registeredReaders.containsKey(subtaskId)) {
-                            throw new IllegalArgumentException(
-                                    String.format(
-                                            "Cannot assign splits %s to subtask %d because the subtask is not registered.",
-                                            registeredReaders.get(subtaskId), subtaskId));
-                        }
-                    }
+                    assignment
+                            .assignment()
+                            .forEach(
+                                    (id, splits) -> {
+                                        if (!registeredReaders.containsKey(id)) {
+                                            throw new IllegalArgumentException(
+                                                    String.format(
+                                                            "Cannot assign splits %s to subtask %d because the subtask is not registered.",
+                                                            splits, id));
+                                        }
+                                    });
 
                     assignmentTracker.recordSplitAssignment(assignment);
                     assignment
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
index d15f28e..380e65c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContextTest.java
@@ -153,7 +153,7 @@ public class SourceCoordinatorContextTest extends SourceCoordinatorTestBase {
                     }
                 },
                 "assignSplits() should fail to assign the splits to a reader that is not registered.",
-                "Cannot assign splits");
+                "Cannot assign splits " + splitsAssignment.assignment().get(0));
     }
 
     @Test