You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/11 16:33:33 UTC

[flink] 03/10: [hotfix][core] Add to Source Enumerator convenience methods to assign single split

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 723e1790744ddc78d19c7c978442af1383f38d33
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Jun 29 16:58:00 2020 +0200

    [hotfix][core] Add to Source Enumerator convenience methods to assign single split
---
 .../flink/api/connector/source/SplitEnumeratorContext.java  | 13 +++++++++++++
 .../apache/flink/api/connector/source/SplitsAssignment.java |  8 ++++++++
 2 files changed, 21 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index 5aee6dd..8ec8618 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -71,6 +71,19 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> {
 	void assignSplits(SplitsAssignment<SplitT> newSplitAssignments);
 
 	/**
+	 * Assigns a single split.
+	 *
+	 * <p>When assigning multiple splits, it is more efficient to assign all of them in a single
+	 * call to the {@link #assignSplits(SplitsAssignment)} method.
+	 *
+	 * @param split The new split
+	 * @param subtask The index of the operator's parallel subtask that shall receive the split.
+	 */
+	default void assignSplit(SplitT split, int subtask) {
+		assignSplits(new SplitsAssignment<>(split, subtask));
+	}
+
+	/**
 	 * Invoke the callable and handover the return value to the handler which will be executed
 	 * by the source coordinator. When this method is invoked multiple times, The <code>Coallble</code>s
 	 * may be executed in a thread pool concurrently.
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
index 6331788..5c08922 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
@@ -20,6 +20,8 @@ package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -31,12 +33,18 @@ import java.util.Map;
  */
 @PublicEvolving
 public final class SplitsAssignment<SplitT extends SourceSplit> {
+
 	private final Map<Integer, List<SplitT>> assignment;
 
 	public SplitsAssignment(Map<Integer, List<SplitT>> assignment) {
 		this.assignment = assignment;
 	}
 
+	public SplitsAssignment(SplitT split, int subtask) {
+		this.assignment = new HashMap<>();
+		this.assignment.put(subtask, Collections.singletonList(split));
+	}
+
 	/**
 	 * @return A mapping from subtask ID to their split assignment.
 	 */