You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/11 16:33:33 UTC
[flink] 03/10: [hotfix][core] Add to Source Enumerator convenience
methods to assign single split
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 723e1790744ddc78d19c7c978442af1383f38d33
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Jun 29 16:58:00 2020 +0200
[hotfix][core] Add to Source Enumerator convenience methods to assign single split
---
.../flink/api/connector/source/SplitEnumeratorContext.java | 13 +++++++++++++
.../apache/flink/api/connector/source/SplitsAssignment.java | 8 ++++++++
2 files changed, 21 insertions(+)
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index 5aee6dd..8ec8618 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -71,6 +71,19 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> {
void assignSplits(SplitsAssignment<SplitT> newSplitAssignments);
/**
+ * Assigns a single split.
+ *
+ * <p>When assigning multiple splits, it is more efficient to assign all of them in a single
+ * call to the {@link #assignSplits(SplitsAssignment)} method.
+ *
+ * @param split The new split
+ * @param subtask The index of the operator's parallel subtask that shall receive the split.
+ */
+ default void assignSplit(SplitT split, int subtask) {
+ assignSplits(new SplitsAssignment<>(split, subtask));
+ }
+
+ /**
* Invoke the callable and handover the return value to the handler which will be executed
* by the source coordinator. When this method is invoked multiple times, The <code>Coallble</code>s
* may be executed in a thread pool concurrently.
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
index 6331788..5c08922 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
@@ -20,6 +20,8 @@ package org.apache.flink.api.connector.source;
import org.apache.flink.annotation.PublicEvolving;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -31,12 +33,18 @@ import java.util.Map;
*/
@PublicEvolving
public final class SplitsAssignment<SplitT extends SourceSplit> {
+
private final Map<Integer, List<SplitT>> assignment;
public SplitsAssignment(Map<Integer, List<SplitT>> assignment) {
this.assignment = assignment;
}
+ public SplitsAssignment(SplitT split, int subtask) {
+ this.assignment = new HashMap<>();
+ this.assignment.put(subtask, Collections.singletonList(split));
+ }
+
/**
* @return A mapping from subtask ID to their split assignment.
*/