You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/11 16:33:30 UTC

[flink] branch master updated (4df2295 -> f42a3eb)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 4df2295  [FLINK-17016][runtime] Enable to use pipelined region scheduling strategy
     new e945ce8  [refactor][connectors] Backport of the connector-base exception handling from the Kafka Connector Pull Request
     new b2318ad  [FLINK-18447][build] Package 'flink-connector-base' into 'flink-dist'
     new 723e179  [hotfix][core] Add to Source Enumerator convenience methods to assign single split
     new 428fbba  [FLINK-19205][core] Add access to configuration and hostname in the SourceReaderContext
     new 930a074  [hotfix][connectors] Add RequestSplitEvent to 'flink-connector-base'
     new 8ebc464  [hotfix][testing] Add a set of parameterizable testing mocks for the Split Reader API
     new e3d273d  [FLINK-19162][connectors] Add 'recycle()' to the RecordsWithSplitIds to support reuse of heavy objects.
     new 09a7a66  [refactor][DataStream API] Make DataStreamUtils.collect() methods more flexible.
     new 23b551f  [hotfix][javadocs] Improve JavaDocs for StreamExecutionEnvironment.addSource(...)
     new f42a3eb  [FLINK-18680][connectors] Make connector base RecordsWithSplitIds more lightweight.

The 10 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../base/source/event/RequestSplitEvent.java       |  51 +++---
 .../base/source/reader/RecordsBySplits.java        | 179 +++++++++++++--------
 .../base/source/reader/RecordsWithSplitIds.java    |  29 ++--
 .../SingleThreadMultiplexSourceReaderBase.java     |  26 +--
 .../base/source/reader/SourceReaderBase.java       | 116 ++++++++-----
 .../base/source/reader/SplitsRecordIterator.java   |  96 -----------
 .../base/source/reader/fetcher/FetchTask.java      |  11 +-
 .../base/source/reader/fetcher/SplitFetcher.java   |  34 ++--
 .../source/reader/fetcher/SplitFetcherTask.java    |   5 +-
 .../source/reader/splitreader/SplitReader.java     |   4 +-
 .../source/reader/splitreader/SplitsAddition.java  |   5 +
 .../base/source/reader/SourceReaderBaseTest.java   | 141 ++++++++++++----
 .../base/source/reader/SourceReaderTestBase.java   |  13 +-
 .../source/reader/fetcher/SplitFetcherTest.java    |  10 +-
 .../base/source/reader/mocks/MockSplitReader.java  |   5 +-
 ...dEmitter.java => PassThroughRecordEmitter.java} |  14 +-
 .../source/reader/mocks/TestingReaderContext.java  |  78 +++++++++
 .../source/reader/mocks/TestingReaderOutput.java   |  71 ++++++++
 .../reader/mocks/TestingRecordsWithSplitIds.java   |  30 ++--
 .../source/reader/mocks/TestingSourceSplit.java    |  28 ++--
 .../source/reader/mocks/TestingSplitReader.java    |  65 ++++++++
 .../api/connector/source/SourceReaderContext.java  |  12 ++
 .../connector/source/SplitEnumeratorContext.java   |  13 ++
 .../api/connector/source/SplitsAssignment.java     |   8 +
 flink-dist/pom.xml                                 |   7 +
 .../streaming/api/datastream/DataStreamUtils.java  | 133 +++++++++++++--
 .../environment/StreamExecutionEnvironment.java    |  24 ++-
 .../streaming/api/operators/SourceOperator.java    |  28 +++-
 .../api/operators/SourceOperatorFactory.java       |  13 +-
 .../operators/source/TestingSourceOperator.java    |   5 +-
 30 files changed, 882 insertions(+), 372 deletions(-)
 copy flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/constraints/AbstractConstraint.java => flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/RequestSplitEvent.java (54%)
 delete mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
 copy flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/{MockRecordEmitter.java => PassThroughRecordEmitter.java} (67%)
 create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
 create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderOutput.java
 copy flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/overlays/CompositeContainerOverlay.java => flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java (57%)
 copy flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Endpoint.java => flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSourceSplit.java (66%)
 create mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java


[flink] 02/10: [FLINK-18447][build] Package 'flink-connector-base' into 'flink-dist'

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2318adea73b5490d3c6fa50ec22b5c56897148a
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Jun 29 10:42:20 2020 +0200

    [FLINK-18447][build] Package 'flink-connector-base' into 'flink-dist'
---
 flink-dist/pom.xml | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index db1a1cc..b8a005c 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -137,6 +137,13 @@ under the License.
 			</exclusions>
 		</dependency>
 
+		<!-- Base Connector Support -->
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-base</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
 		<!-- Default file system support. The Hadoop and MapR dependencies -->
 		<!--       are optional, so not being added to the dist jar        -->
 


[flink] 05/10: [hotfix][connectors] Add RequestSplitEvent to 'flink-connector-base'

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 930a07438be1185388d7150640f294dfe2a5d378
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Jun 29 22:01:50 2020 +0200

    [hotfix][connectors] Add RequestSplitEvent to 'flink-connector-base'
---
 .../base/source/event/RequestSplitEvent.java       | 71 ++++++++++++++++++++++
 1 file changed, 71 insertions(+)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/RequestSplitEvent.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/RequestSplitEvent.java
new file mode 100644
index 0000000..4bdb92a
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/event/RequestSplitEvent.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.event;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * An event to request splits, sent typically from the Source Reader to the Source Enumerator.
+ */
+public final class RequestSplitEvent implements SourceEvent {
+
+	private static final long serialVersionUID = 1L;
+
+	@Nullable
+	private final String hostName;
+
+	public RequestSplitEvent(@Nullable String hostName) {
+		this.hostName = hostName;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Nullable
+	public String hostName() {
+		return hostName;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return 65932633 + Objects.hashCode(hostName);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		final RequestSplitEvent that = (RequestSplitEvent) o;
+		return Objects.equals(hostName, that.hostName);
+	}
+
+	@Override
+	public String toString() {
+		return String.format("RequestSplitEvent (host='%s')", hostName);
+	}
+}


[flink] 09/10: [hotfix][javadocs] Improve JavaDocs for StreamExecutionEnvironment.addSource(...)

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 23b551f6f5fd961b8ccb31144d2a0f2801e2feb6
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Sep 2 17:24:54 2020 +0200

    [hotfix][javadocs] Improve JavaDocs for StreamExecutionEnvironment.addSource(...)
---
 .../environment/StreamExecutionEnvironment.java    | 24 ++++++++++++++++++++--
 1 file changed, 22 insertions(+), 2 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index e559147..f5ef633 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1605,7 +1605,17 @@ public class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Add a data {@link Source} to the environment to get a {@link DataStream}.
+	 * Adds a data {@link Source} to the environment to get a {@link DataStream}.
+	 *
+	 * <p>The result will be either a bounded data stream (that can be processed in a batch way) or
+	 * an unbounded data stream (that must be processed in a streaming way), based on the
+	 * boundedness property of the source, as defined by {@link Source#getBoundedness()}.
+	 *
+	 * <p>The result type (that is used to create serializers for the produced data events)
+	 * will be automatically extracted. This is useful for sources that describe the produced types
+	 * already in their configuration, to avoid having to declare the type multiple times.
+	 * For example the file sources and Kafka sources already define the produced byte their
+	 * parsers/serializers/formats, and can forward that information.
 	 *
 	 * @param source
 	 * 		the user defined source
@@ -1624,7 +1634,17 @@ public class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Add a data {@link Source} to the environment to get a {@link DataStream}.
+	 * Adds a data {@link Source} to the environment to get a {@link DataStream}.
+	 *
+	 * <p>The result will be either a bounded data stream (that can be processed in a batch way) or
+	 * an unbounded data stream (that must be processed in a streaming way), based on the
+	 * boundedness property of the source, as defined by {@link Source#getBoundedness()}.
+	 *
+	 * <p>This method takes an explicit type information for the produced data stream, so that callers
+	 * can define directly what type/serializer will be used for the produced stream.
+	 * For sources that describe their produced type, the method
+	 * {@link #fromSource(Source, WatermarkStrategy, String)} can be used to avoid specifying the
+	 * produced type redundantly.
 	 *
 	 * @param source
 	 * 		the user defined source


[flink] 04/10: [FLINK-19205][core] Add access to configuration and hostname in the SourceReaderContext

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 428fbbad85379f71a9fbf1a1ab3e3dcfa9006436
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Aug 26 15:50:16 2020 +0200

    [FLINK-19205][core] Add access to configuration and hostname in the SourceReaderContext
---
 .../api/connector/source/SourceReaderContext.java  | 12 ++++++++++
 .../streaming/api/operators/SourceOperator.java    | 28 ++++++++++++++++++----
 .../api/operators/SourceOperatorFactory.java       | 13 +++++++---
 .../operators/source/TestingSourceOperator.java    |  5 +++-
 4 files changed, 50 insertions(+), 8 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index 658034b..c8ed36d 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 
 /**
@@ -33,6 +34,17 @@ public interface SourceReaderContext {
 	MetricGroup metricGroup();
 
 	/**
+	 * Gets the configuration with which Flink was started.
+	 */
+	Configuration getConfiguration();
+
+	/**
+	 * Gets the hostname of the machine where this reader is executed. This can be used
+	 * to request splits local to the machine, if needed.
+	 */
+	String getLocalHostName();
+
+	/**
 	 * Send a source event to the source coordinator.
 	 *
 	 * @param sourceEvent the source event to coordinator.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 33e95f2..1af572c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputStatus;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.metrics.MetricGroup;
@@ -65,7 +66,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * @param <OUT> The output type of the operator.
  */
 @Internal
-@SuppressWarnings("serial")
+//@SuppressWarnings("serial")
 public class SourceOperator<OUT, SplitT extends SourceSplit>
 		extends AbstractStreamOperator<OUT>
 		implements OperatorEventHandler, PushingAsyncDataInput<OUT> {
@@ -89,6 +90,12 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 	/** The factory for timestamps and watermark generators. */
 	private final WatermarkStrategy<OUT> watermarkStrategy;
 
+	/** The Flink configuration. */
+	private final Configuration configuration;
+
+	/** Host name of the machine where the operator runs, to support locality aware work assignment. */
+	private final String localHostname;
+
 	// ---- lazily initialized fields (these fields are the "hot" fields) ----
 
 	/** The source reader that does most of the work. */
@@ -110,13 +117,17 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 			OperatorEventGateway operatorEventGateway,
 			SimpleVersionedSerializer<SplitT> splitSerializer,
 			WatermarkStrategy<OUT> watermarkStrategy,
-			ProcessingTimeService timeService) {
+			ProcessingTimeService timeService,
+			Configuration configuration,
+			String localHostname) {
 
 		this.readerFactory = checkNotNull(readerFactory);
 		this.operatorEventGateway = checkNotNull(operatorEventGateway);
 		this.splitSerializer = checkNotNull(splitSerializer);
 		this.watermarkStrategy = checkNotNull(watermarkStrategy);
 		this.processingTimeService = timeService;
+		this.configuration = checkNotNull(configuration);
+		this.localHostname = checkNotNull(localHostname);
 	}
 
 	@Override
@@ -130,6 +141,16 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 			}
 
 			@Override
+			public Configuration getConfiguration() {
+				return configuration;
+			}
+
+			@Override
+			public String getLocalHostName() {
+				return localHostname;
+			}
+
+			@Override
 			public void sendSourceEventToCoordinator(SourceEvent event) {
 				operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event));
 			}
@@ -219,8 +240,7 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
 
 	private void registerReader() {
 		operatorEventGateway.sendEventToCoordinator(new ReaderRegistrationEvent(
-				getRuntimeContext().getIndexOfThisSubtask(),
-				"UNKNOWN_LOCATION"));
+				getRuntimeContext().getIndexOfThisSubtask(), localHostname));
 	}
 
 	// --------------- methods for unit tests ------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
index d1636cd..1d7aed7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.connector.source.Source;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
@@ -75,7 +76,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 				gateway,
 				source.getSplitSerializer(),
 				watermarkStrategy,
-				parameters.getProcessingTimeService());
+				parameters.getProcessingTimeService(),
+				parameters.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(),
+				parameters.getContainingTask().getEnvironment().getTaskManagerInfo().getTaskManagerExternalAddress());
 
 		sourceOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput());
 		parameters.getOperatorEventDispatcher().registerEventHandler(operatorId, sourceOperator);
@@ -115,7 +118,9 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 			OperatorEventGateway eventGateway,
 			SimpleVersionedSerializer<?> splitSerializer,
 			WatermarkStrategy<T> watermarkStrategy,
-			ProcessingTimeService timeService) {
+			ProcessingTimeService timeService,
+			Configuration config,
+			String localHostName) {
 
 		// jumping through generics hoops: cast the generics away to then cast them back more strictly typed
 		final Function<SourceReaderContext, SourceReader<T, SplitT>> typedReaderFactory =
@@ -128,6 +133,8 @@ public class SourceOperatorFactory<OUT> extends AbstractStreamOperatorFactory<OU
 				eventGateway,
 				typedSplitSerializer,
 				watermarkStrategy,
-				timeService);
+				timeService,
+				config,
+				localHostName);
 	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
index 038a211..a133ace 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/TestingSourceOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.coordination.MockOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
@@ -71,7 +72,9 @@ public class TestingSourceOperator<T>  extends SourceOperator<T, MockSourceSplit
 			eventGateway,
 			new MockSourceSplitSerializer(),
 			watermarkStrategy,
-			timeService);
+			timeService,
+			new Configuration(),
+			"localhost");
 
 		this.subtaskIndex = subtaskIndex;
 		this.parallelism = parallelism;


[flink] 08/10: [refactor][DataStream API] Make DataStreamUtils.collect() methods more flexible.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 09a7a66b7313fea64817fe960a8da1265b428efc
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Wed Sep 2 15:23:27 2020 +0200

    [refactor][DataStream API] Make DataStreamUtils.collect() methods more flexible.
    
    This supports simple ways of pulling bounded streams to the client, as well as a defined number of
    elements from an unbounded stream.
---
 .../streaming/api/datastream/DataStreamUtils.java  | 133 +++++++++++++++++++--
 1 file changed, 123 insertions(+), 10 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
index 91d132c..45f4ad0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
@@ -31,9 +31,14 @@ import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A collection of utilities for {@link DataStream DataStreams}.
  */
@@ -41,33 +46,124 @@ import java.util.UUID;
 public final class DataStreamUtils {
 
 	/**
-	 * Returns an iterator to iterate over the elements of the DataStream.
-	 * @return The iterator
+	 * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
+	 * of the given DataStream.
+	 *
+	 * <p>The DataStream application is executed in the regular distributed manner on the target environment,
+	 * and the events from the stream are polled back to this application process and thread through
+	 * Flink's REST API.
 	 */
 	public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) {
+		return collect(stream, "Data Stream Collect");
+	}
+
+	/**
+	 * Triggers the distributed execution of the streaming dataflow and returns an iterator over the elements
+	 * of the given DataStream.
+	 *
+	 * <p>The DataStream application is executed in the regular distributed manner on the target environment,
+	 * and the events from the stream are polled back to this application process and thread through
+	 * Flink's REST API.
+	 */
+	public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream, String executionJobName) {
+		try {
+			return collectWithClient(stream, executionJobName).iterator;
+		} catch (Exception e) {
+			// this "wrap as unchecked" step is here only to preserve the exception signature
+			// backwards compatible.
+			throw new RuntimeException("Failed to execute data stream", e);
+		}
+	}
+
+	/**
+	 * Starts the execution of the program and returns an iterator to read the result of the
+	 * given data stream, plus a {@link JobClient} to interact with the application execution.
+	 */
+	public static <OUT> ClientAndIterator<OUT> collectWithClient(
+			DataStream<OUT> stream,
+			String jobExecutionName) throws Exception {
+
 		TypeSerializer<OUT> serializer = stream.getType().createSerializer(
-			stream.getExecutionEnvironment().getConfig());
+				stream.getExecutionEnvironment().getConfig());
 		String accumulatorName = "dataStreamCollect_" + UUID.randomUUID().toString();
 
 		StreamExecutionEnvironment env = stream.getExecutionEnvironment();
 		CollectSinkOperatorFactory<OUT> factory = new CollectSinkOperatorFactory<>(serializer, accumulatorName);
 		CollectSinkOperator<OUT> operator = (CollectSinkOperator<OUT>) factory.getOperator();
 		CollectResultIterator<OUT> iterator = new CollectResultIterator<>(
-			operator.getOperatorIdFuture(), serializer, accumulatorName, env.getCheckpointConfig());
+				operator.getOperatorIdFuture(), serializer, accumulatorName, env.getCheckpointConfig());
 		CollectStreamSink<OUT> sink = new CollectStreamSink<>(stream, factory);
 		sink.name("Data stream collect sink");
 		env.addOperator(sink.getTransformation());
 
-		try {
-			JobClient jobClient = env.executeAsync("Data Stream Collect");
-			iterator.setJobClient(jobClient);
-		} catch (Exception e) {
-			throw new RuntimeException("Failed to execute data stream", e);
+		final JobClient jobClient = env.executeAsync(jobExecutionName);
+		iterator.setJobClient(jobClient);
+
+		return new ClientAndIterator<>(jobClient, iterator);
+	}
+
+	/**
+	 * Collects contents the given DataStream into a list, assuming that the stream is a bounded stream.
+	 *
+	 * <p>This method blocks until the job execution is complete. By the time the method returns, the
+	 * job will have reached its FINISHED status.
+	 *
+	 * <p>Note that if the stream is unbounded, this method will never return and might fail with an
+	 * Out-of-Memory Error because it attempts to collect an infinite stream into a list.
+	 *
+	 * @throws Exception Exceptions that occur during the execution are forwarded.
+	 */
+	public static <E> List<E> collectBoundedStream(DataStream<E> stream, String jobName) throws Exception {
+		final ArrayList<E> list = new ArrayList<>();
+		final Iterator<E> iter = collectWithClient(stream, jobName).iterator;
+		while (iter.hasNext()) {
+			list.add(iter.next());
+		}
+		list.trimToSize();
+		return list;
+	}
+
+	/**
+	 * Triggers execution of the DataStream application and collects the given number of records from the stream.
+	 * After the records are received, the execution is canceled.
+	 */
+	public static <E> List<E> collectUnboundedStream(DataStream<E> stream, int numElements, String jobName) throws Exception {
+		final ClientAndIterator<E> clientAndIterator = collectWithClient(stream, jobName);
+		final List<E> result = collectRecordsFromUnboundedStream(clientAndIterator, numElements);
+
+		// cancel the job not that we have received enough elements
+		clientAndIterator.client.cancel().get();
+
+		return result;
+	}
+
+	public static <E> List<E> collectRecordsFromUnboundedStream(
+			final ClientAndIterator<E> client,
+			final int numElements) {
+
+		checkNotNull(client, "client");
+		checkArgument(numElements > 0, "numElement must be > 0");
+
+		final ArrayList<E> result = new ArrayList<>(numElements);
+		final Iterator<E> iterator = client.iterator;
+
+		while (iterator.hasNext()) {
+			result.add(iterator.next());
+			if (result.size() == numElements) {
+				return result;
+			}
 		}
 
-		return iterator;
+		throw new IllegalArgumentException(String.format(
+				"The stream ended before reaching the requested %d records. Only %d records were received.",
+				numElements, result.size()));
 	}
 
+	// ------------------------------------------------------------------------
+	//  Deriving a KeyedStream from a stream already partitioned by key
+	//  without a shuffle
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Reinterprets the given {@link DataStream} as a {@link KeyedStream}, which extracts keys with the given
 	 * {@link KeySelector}.
@@ -129,4 +225,21 @@ public final class DataStreamUtils {
 	 * Private constructor to prevent instantiation.
 	 */
 	private DataStreamUtils() {}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A pair of an {@link Iterator} to receive results from a streaming application and a
+	 * {@link JobClient} to interact with the program.
+	 */
+	public static final class ClientAndIterator<E> {
+
+		public final JobClient client;
+		public final Iterator<E> iterator;
+
+		ClientAndIterator(JobClient client, Iterator<E> iterator) {
+			this.client = checkNotNull(client);
+			this.iterator = checkNotNull(iterator);
+		}
+	}
 }


[flink] 06/10: [hotfix][testing] Add a set of parameterizable testing mocks for the Split Reader API

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8ebc464c2520453a70001cd712abc8dee6ee89e0
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 1 17:17:55 2020 +0200

    [hotfix][testing] Add a set of parameterizable testing mocks for the Split Reader API
    
    These utils are different to the previous mocks in that they don't hard-code ranges of values (integers)
    to emit and expect (validate) but that they return given sets of records and collect produced records.
    That makes them more reusable and more suitable for Arrange/Act/Assert-style testing.
---
 .../reader/mocks/PassThroughRecordEmitter.java     | 33 +++++++++
 .../source/reader/mocks/TestingReaderContext.java  | 78 ++++++++++++++++++++++
 .../source/reader/mocks/TestingReaderOutput.java   | 71 ++++++++++++++++++++
 .../reader/mocks/TestingRecordsWithSplitIds.java   | 73 ++++++++++++++++++++
 .../source/reader/mocks/TestingSourceSplit.java    | 45 +++++++++++++
 .../source/reader/mocks/TestingSplitReader.java    | 65 ++++++++++++++++++
 6 files changed, 365 insertions(+)

diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/PassThroughRecordEmitter.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/PassThroughRecordEmitter.java
new file mode 100644
index 0000000..ad61a71
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/PassThroughRecordEmitter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.mocks;
+
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+
+/**
+ * A record emitter that pipes records directly into the source output.
+ */
+public final class PassThroughRecordEmitter<E, SplitStateT> implements RecordEmitter<E, E, SplitStateT> {
+
+	@Override
+	public void emitRecord(E element, SourceOutput<E> output, SplitStateT splitState) throws Exception {
+		output.collect(element);
+	}
+}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
new file mode 100644
index 0000000..02faf1f
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderContext.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.mocks;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A testing implementation of the {@link SourceReaderContext}.
+ */
+public class TestingReaderContext implements SourceReaderContext {
+
+	private final UnregisteredMetricsGroup metrics = new UnregisteredMetricsGroup();
+
+	private final Configuration config;
+
+	private final ArrayList<SourceEvent> sentEvents = new ArrayList<>();
+
+	public TestingReaderContext() {
+		this(new Configuration());
+	}
+
+	public TestingReaderContext(Configuration config) {
+		this.config = config;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public MetricGroup metricGroup() {
+		return metrics;
+	}
+
+	@Override
+	public Configuration getConfiguration() {
+		return config;
+	}
+
+	@Override
+	public String getLocalHostName() {
+		return "localhost";
+	}
+
+	@Override
+	public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {}
+
+	// ------------------------------------------------------------------------
+
+	public List<SourceEvent> getSentEvents() {
+		return new ArrayList<>(sentEvents);
+	}
+
+	public void clearSentEvents() {
+		sentEvents.clear();
+	}
+}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderOutput.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderOutput.java
new file mode 100644
index 0000000..4483819
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingReaderOutput.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.mocks;
+
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+
+import java.util.ArrayList;
+
+/**
+ * A {@code ReaderOutput} for testing that collects the emitted records.
+ */
+public class TestingReaderOutput<E> implements ReaderOutput<E> {
+
+	private final ArrayList<E> emittedRecords = new ArrayList<>();
+
+	@Override
+	public void collect(E record) {
+		emittedRecords.add(record);
+	}
+
+	@Override
+	public void collect(E record, long timestamp) {
+		collect(record);
+	}
+
+	@Override
+	public void emitWatermark(Watermark watermark) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void markIdle() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public SourceOutput<E> createOutputForSplit(String splitId) {
+		return this;
+	}
+
+	@Override
+	public void releaseOutputForSplit(String splitId) {}
+
+	// ------------------------------------------------------------------------
+
+	public ArrayList<E> getEmittedRecords() {
+		return emittedRecords;
+	}
+
+	public void clearEmittedRecords() {
+		emittedRecords.clear();
+	}
+}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
new file mode 100644
index 0000000..2a8377a
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.mocks;
+
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A mock implementation of {@link RecordsWithSplitIds} that returns a given set of records.
+ */
+public class TestingRecordsWithSplitIds<E> implements RecordsWithSplitIds<E> {
+
+	private final Map<String, Collection<E>> records;
+
+	private final String splitId;
+
+	private volatile boolean isRecycled;
+
+	@SafeVarargs
+	public TestingRecordsWithSplitIds(String splitId, E... records) {
+		this.splitId = checkNotNull(splitId);
+		this.records = new HashMap<>();
+		this.records.put(splitId, Arrays.asList(records));
+	}
+
+	@Override
+	public Collection<String> splitIds() {
+		return Collections.singleton(splitId);
+	}
+
+	@Override
+	public Map<String, Collection<E>> recordsBySplits() {
+		return records;
+	}
+
+	@Override
+	public Set<String> finishedSplits() {
+		return Collections.emptySet();
+	}
+
+	@Override
+	public void recycle() {
+		isRecycled = true;
+	}
+
+	public boolean isRecycled() {
+		return isRecycled;
+	}
+}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSourceSplit.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSourceSplit.java
new file mode 100644
index 0000000..535137b
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSourceSplit.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.mocks;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link SourceSplit} that only has an ID.
+ */
+public class TestingSourceSplit implements SourceSplit {
+
+	private final String splitId;
+
+	public TestingSourceSplit(String splitId) {
+		this.splitId = checkNotNull(splitId);
+	}
+
+	@Override
+	public String splitId() {
+		return splitId;
+	}
+
+	@Override
+	public String toString() {
+		return splitId;
+	}
+}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
new file mode 100644
index 0000000..ede92eb
--- /dev/null
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.source.reader.mocks;
+
+import org.apache.flink.api.connector.source.SourceSplit;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Queue;
+
+/**
+ * A {@code SplitReader} that returns a pre-defined set of records (by split).
+ */
+public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitReader<E, SplitT> {
+
+	private final ArrayDeque<RecordsWithSplitIds<E>> fetches;
+
+	@SafeVarargs
+	public TestingSplitReader(RecordsWithSplitIds<E>... fetches) {
+		this.fetches = new ArrayDeque<>(fetches.length);
+		this.fetches.addAll(Arrays.asList(fetches));
+	}
+
+	@Override
+	public RecordsWithSplitIds<E> fetch() throws InterruptedException, IOException {
+		if (!fetches.isEmpty()) {
+			return fetches.removeFirst();
+		} else {
+			// block until interrupted
+			synchronized (fetches) {
+				while (true) {
+					fetches.wait();
+				}
+			}
+		}
+	}
+
+	@Override
+	public void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges) {
+		splitsChanges.clear();
+	}
+
+	@Override
+	public void wakeUp() {}
+}


[flink] 03/10: [hotfix][core] Add to Source Enumerator convenience methods to assign single split

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 723e1790744ddc78d19c7c978442af1383f38d33
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Jun 29 16:58:00 2020 +0200

    [hotfix][core] Add to Source Enumerator convenience methods to assign single split
---
 .../flink/api/connector/source/SplitEnumeratorContext.java  | 13 +++++++++++++
 .../apache/flink/api/connector/source/SplitsAssignment.java |  8 ++++++++
 2 files changed, 21 insertions(+)

diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index 5aee6dd..8ec8618 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -71,6 +71,19 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> {
 	void assignSplits(SplitsAssignment<SplitT> newSplitAssignments);
 
 	/**
+	 * Assigns a single split.
+	 *
+	 * <p>When assigning multiple splits, it is more efficient to assign all of them in a single
+	 * call to the {@link #assignSplits(SplitsAssignment)} method.
+	 *
+	 * @param split The new split
+	 * @param subtask The index of the operator's parallel subtask that shall receive the split.
+	 */
+	default void assignSplit(SplitT split, int subtask) {
+		assignSplits(new SplitsAssignment<>(split, subtask));
+	}
+
+	/**
 	 * Invoke the callable and handover the return value to the handler which will be executed
 	 * by the source coordinator. When this method is invoked multiple times, The <code>Coallble</code>s
 	 * may be executed in a thread pool concurrently.
diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
index 6331788..5c08922 100644
--- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
+++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitsAssignment.java
@@ -20,6 +20,8 @@ package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -31,12 +33,18 @@ import java.util.Map;
  */
 @PublicEvolving
 public final class SplitsAssignment<SplitT extends SourceSplit> {
+
 	private final Map<Integer, List<SplitT>> assignment;
 
 	public SplitsAssignment(Map<Integer, List<SplitT>> assignment) {
 		this.assignment = assignment;
 	}
 
+	public SplitsAssignment(SplitT split, int subtask) {
+		this.assignment = new HashMap<>();
+		this.assignment.put(subtask, Collections.singletonList(split));
+	}
+
 	/**
 	 * @return A mapping from subtask ID to their split assignment.
 	 */


[flink] 01/10: [refactor][connectors] Backport of the connector-base exception handling from the Kafka Connector Pull Request

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e945ce8a933bc378844782f784ca473c767ca159
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Sep 7 21:12:55 2020 +0200

    [refactor][connectors] Backport of the connector-base exception handling from the Kafka Connector Pull Request
---
 .../SingleThreadMultiplexSourceReaderBase.java     | 26 ++++-----
 .../base/source/reader/fetcher/FetchTask.java      | 11 ++--
 .../base/source/reader/fetcher/SplitFetcher.java   | 34 +++++++-----
 .../source/reader/fetcher/SplitFetcherTask.java    |  5 +-
 .../source/reader/splitreader/SplitReader.java     |  4 +-
 .../source/reader/splitreader/SplitsAddition.java  |  5 ++
 .../base/source/reader/SourceReaderBaseTest.java   | 62 +++++++++++-----------
 .../base/source/reader/SourceReaderTestBase.java   | 13 +++--
 8 files changed, 90 insertions(+), 70 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index 546e20a..3239f28 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -37,21 +37,21 @@ import java.util.function.Supplier;
  * @param <SplitStateT>
  */
 public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
-		extends SourceReaderBase<E, T, SplitT, SplitStateT> {
+	extends SourceReaderBase<E, T, SplitT, SplitStateT> {
 
 	public SingleThreadMultiplexSourceReaderBase(
-			FutureNotifier futureNotifier,
-			FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
-			Supplier<SplitReader<E, SplitT>> splitFetcherSupplier,
-			RecordEmitter<E, T, SplitStateT> recordEmitter,
-			Configuration config,
-			SourceReaderContext context) {
+		FutureNotifier futureNotifier,
+		FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+		Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
+		RecordEmitter<E, T, SplitStateT> recordEmitter,
+		Configuration config,
+		SourceReaderContext context) {
 		super(
-				futureNotifier,
-				elementsQueue,
-				new SingleThreadFetcherManager<>(futureNotifier, elementsQueue, splitFetcherSupplier),
-				recordEmitter,
-				config,
-				context);
+			futureNotifier,
+			elementsQueue,
+			new SingleThreadFetcherManager<>(futureNotifier, elementsQueue, splitReaderSupplier),
+			recordEmitter,
+			config,
+			context);
 	}
 }
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
index aff21fd..30835ce 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.BlockingQueue;
 import java.util.function.Consumer;
@@ -38,10 +39,10 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 	private volatile boolean wakeup;
 
 	FetchTask(
-			SplitReader<E, SplitT> splitReader,
-			BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
-			Consumer<Collection<String>> splitFinishedCallback,
-			Thread runningThread) {
+		SplitReader<E, SplitT> splitReader,
+		BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+		Consumer<Collection<String>> splitFinishedCallback,
+		Thread runningThread) {
 		this.splitReader = splitReader;
 		this.elementsQueue = elementsQueue;
 		this.splitFinishedCallback = splitFinishedCallback;
@@ -51,7 +52,7 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 	}
 
 	@Override
-	public boolean run() throws InterruptedException {
+	public boolean run() throws InterruptedException, IOException {
 		try {
 			if (!isWakenUp() && lastRecords == null) {
 				lastRecords = splitReader.fetch();
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index d006bb0..35deeba 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -60,10 +61,10 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 	private volatile boolean isIdle;
 
 	SplitFetcher(
-			int id,
-			BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
-			SplitReader<E, SplitT> splitReader,
-			Runnable shutdownHook) {
+		int id,
+		BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+		SplitReader<E, SplitT> splitReader,
+		Runnable shutdownHook) {
 
 		this.id = id;
 		this.taskQueue = new LinkedBlockingDeque<>();
@@ -84,12 +85,12 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 			// Remove the split from the assignments if it is already done.
 			runningThread = Thread.currentThread();
 			this.fetchTask = new FetchTask<>(
-					splitReader,
-					elementsQueue,
-					ids -> {
-						ids.forEach(assignedSplits::remove);
-						updateIsIdle();
-					}, runningThread);
+				splitReader,
+				elementsQueue,
+				ids -> {
+					ids.forEach(this::removeAssignedSplit);
+					updateIsIdle();
+				}, runningThread);
 			while (!closed.get()) {
 				runOnce();
 			}
@@ -139,8 +140,11 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 				LOG.debug("Split fetcher has been waken up.");
 			} else {
 				throw new RuntimeException(String.format(
-						"SplitFetcher thread %d interrupted while polling the records", id), ie);
+					"SplitFetcher thread %d interrupted while polling the records", id), ie);
 			}
+		} catch (IOException ioe) {
+			throw new RuntimeException(String.format(
+				"SplitFetcher thread %d received unexpected exception while polling the records", id), ioe);
 		}
 		// If the task is not null that means this task needs to be re-executed. This only
 		// happens when the task is the fetching task or the task was interrupted.
@@ -272,7 +276,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 		// Only enqueue unfinished non-fetch task.
 		if (!closed.get() && isRunningTask(task) && task != fetchTask && !taskQueue.offerFirst(task)) {
 			throw new RuntimeException(
-					"The task queue is full. This is only theoretically possible when really bad thing happens.");
+				"The task queue is full. This is only theoretically possible when really bad thing happens.");
 		}
 		if (task != null) {
 			LOG.debug("Enqueued task {}", task);
@@ -283,6 +287,12 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 		return task != null && task != WAKEUP_TASK;
 	}
 
+	private void removeAssignedSplit(String splitId) {
+		assignedSplits.remove(splitId);
+		LOG.debug("Removed {} split from assigned splits. The assigned splits now are {}", splitId, assignedSplits);
+
+	}
+
 	//--------------------- Helper class ------------------
 
 	private static class DummySplitFetcherTask implements SplitFetcherTask {
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
index 716d2e2..999601a 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTask.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.base.source.reader.fetcher;
 
+import java.io.IOException;
+
 /**
  * An interface similar to {@link Runnable} but allows throwing exceptions and wakeup.
  */
@@ -31,8 +33,9 @@ public interface SplitFetcherTask {
 	 *
 	 * @return whether the runnable has successfully finished running.
 	 * @throws InterruptedException when interrupted.
+	 * @throws IOException when the performed I/O operation fails.
 	 */
-	boolean run() throws InterruptedException;
+	boolean run() throws InterruptedException, IOException;
 
 	/**
 	 * Wake up the running thread.
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index 89cf81b..b980f7b 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.base.source.reader.splitreader;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 
+import java.io.IOException;
 import java.util.Queue;
 
 /**
@@ -42,8 +43,9 @@ public interface SplitReader<E, SplitT extends SourceSplit> {
 	 * @return the Ids of the finished splits.
 	 *
 	 * @throws InterruptedException when interrupted
+	 * @throws IOException when encountered IO errors, such as deserialization failures.
 	 */
-	RecordsWithSplitIds<E> fetch() throws InterruptedException;
+	RecordsWithSplitIds<E> fetch() throws InterruptedException, IOException;
 
 	/**
 	 * Handle the split changes. This call should be non-blocking.
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
index ebd2330..e1a5650 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitsAddition.java
@@ -30,4 +30,9 @@ public class SplitsAddition<SplitT> extends SplitsChange<SplitT> {
 	public SplitsAddition(List<SplitT> splits) {
 		super(splits);
 	}
+
+	@Override
+	public String toString() {
+		return String.format("SplitAddition:[%s]", splits());
+	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 26504cb..a332efe 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -53,33 +53,33 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 
 		FutureNotifier futureNotifier = new FutureNotifier();
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
-				new FutureCompletingBlockingQueue<>(futureNotifier);
+			new FutureCompletingBlockingQueue<>(futureNotifier);
 		// We have to handle split changes first, otherwise fetch will not be called.
 		try (MockSourceReader reader = new MockSourceReader(
-				futureNotifier,
-				elementsQueue,
-				() -> new SplitReader<int[], MockSourceSplit>() {
-					@Override
-					public RecordsWithSplitIds<int[]> fetch() {
-						throw new RuntimeException(errMsg);
-					}
-
-					@Override
-					public void handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> splitsChanges) {
-						// We have to handle split changes first, otherwise fetch will not be called.
-						splitsChanges.clear();
-					}
-
-					@Override
-					public void wakeUp() {
-					}
-				},
-				getConfig(),
-				null)) {
+			futureNotifier,
+			elementsQueue,
+			() -> new SplitReader<int[], MockSourceSplit>() {
+				@Override
+				public RecordsWithSplitIds<int[]> fetch() {
+					throw new RuntimeException(errMsg);
+				}
+
+				@Override
+				public void handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> splitsChanges) {
+					// We have to handle split changes first, otherwise fetch will not be called.
+					splitsChanges.clear();
+				}
+
+				@Override
+				public void wakeUp() {
+				}
+			},
+			getConfig(),
+			null)) {
 			ValidatingSourceOutput output = new ValidatingSourceOutput();
 			reader.addSplits(Collections.singletonList(getSplit(0,
-					NUM_RECORDS_PER_SPLIT,
-					Boundedness.CONTINUOUS_UNBOUNDED)));
+				NUM_RECORDS_PER_SPLIT,
+				Boundedness.CONTINUOUS_UNBOUNDED)));
 			// This is not a real infinite loop, it is supposed to throw exception after two polls.
 			while (true) {
 				reader.pollNext(output);
@@ -95,15 +95,15 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 	protected MockSourceReader createReader() {
 		FutureNotifier futureNotifier = new FutureNotifier();
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
-				new FutureCompletingBlockingQueue<>(futureNotifier);
+			new FutureCompletingBlockingQueue<>(futureNotifier);
 		MockSplitReader mockSplitReader =
-				new MockSplitReader(2, true, true);
+			new MockSplitReader(2, true, true);
 		return new MockSourceReader(
-				futureNotifier,
-				elementsQueue,
-				() -> mockSplitReader,
-				getConfig(),
-				null);
+			futureNotifier,
+			elementsQueue,
+			() -> mockSplitReader,
+			getConfig(),
+			null);
 	}
 
 	@Override
@@ -130,7 +130,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 	}
 
 	@Override
-	protected long getIndex(MockSourceSplit split) {
+	protected long getNextRecordIndex(MockSourceSplit split) {
 		return split.index();
 	}
 
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
index 2acd4e1..22dda52 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderTestBase.java
@@ -130,14 +130,13 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
 		ValidatingSourceOutput output = new ValidatingSourceOutput();
 		// Add a split to start the fetcher.
 		List<SplitT> splits = getSplits(NUM_SPLITS, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED);
-		// Poll 5 records. That means split 0 and 1 will at index 2, split 1 will at index 1.
 		try (SourceReader<Integer, SplitT> reader =
 				consumeRecords(splits, output, NUM_SPLITS * NUM_RECORDS_PER_SPLIT)) {
 			List<SplitT> state = reader.snapshotState();
 			assertEquals("The snapshot should only have 10 splits. ", NUM_SPLITS, state.size());
 			for (int i = 0; i < NUM_SPLITS; i++) {
 				assertEquals("The first four splits should have been fully consumed.",
-						NUM_RECORDS_PER_SPLIT, getIndex(state.get(i)));
+					NUM_RECORDS_PER_SPLIT, getNextRecordIndex(state.get(i)));
 			}
 		}
 	}
@@ -150,12 +149,12 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
 
 	protected abstract SplitT getSplit(int splitId, int numRecords, Boundedness boundedness);
 
-	protected abstract long getIndex(SplitT split);
+	protected abstract long getNextRecordIndex(SplitT split);
 
 	private SourceReader<Integer, SplitT> consumeRecords(
-			List<SplitT> splits,
-			ValidatingSourceOutput output,
-			int n) throws Exception {
+		List<SplitT> splits,
+		ValidatingSourceOutput output,
+		int n) throws Exception {
 		SourceReader<Integer, SplitT> reader = createReader();
 		// Add splits to start the fetcher.
 		reader.addSplits(splits);
@@ -194,7 +193,7 @@ public abstract class SourceReaderTestBase<SplitT extends SourceSplit> extends T
 		public void validate() {
 
 			assertEquals(String.format("Should be %d distinct elements in total", TOTAL_NUM_RECORDS),
-					TOTAL_NUM_RECORDS, consumedValues.size());
+				TOTAL_NUM_RECORDS, consumedValues.size());
 			assertEquals(String.format("Should be %d elements in total", TOTAL_NUM_RECORDS), TOTAL_NUM_RECORDS, count);
 			assertEquals("The min value should be 0", 0, min);
 			assertEquals("The max value should be " + (TOTAL_NUM_RECORDS - 1), TOTAL_NUM_RECORDS - 1, max);


[flink] 07/10: [FLINK-19162][connectors] Add 'recycle()' to the RecordsWithSplitIds to support reuse of heavy objects.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e3d273de822b085183d09b275a445879ff94b350
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 1 17:19:52 2020 +0200

    [FLINK-19162][connectors] Add 'recycle()' to the RecordsWithSplitIds to support reuse of heavy objects.
---
 .../base/source/reader/RecordsWithSplitIds.java    |  9 +++
 .../base/source/reader/SourceReaderBase.java       |  1 +
 .../base/source/reader/SplitsRecordIterator.java   | 14 ++--
 .../base/source/reader/SourceReaderBaseTest.java   | 79 ++++++++++++++++++++++
 4 files changed, 98 insertions(+), 5 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
index f616125..dc915b3 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
@@ -47,4 +47,13 @@ public interface RecordsWithSplitIds<E> {
 	 * @return the finished splits after this RecordsWithSplitIds is returned.
 	 */
 	Set<String> finishedSplits();
+
+	/**
+	 * This method is called when all records from this batch have been emitted.
+	 *
+	 * <p>Overriding this method gives implementations the opportunity to recycle/reuse this object,
+	 * which is a performance optimization that is important for cases where the record objects are
+	 * large or otherwise heavy to allocate.
+	 */
+	default void recycle() {}
 }
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index e01180e..4a41d49 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -149,6 +149,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 				});
 				// Handle the finished splits.
 				onSplitFinished(splitIter.finishedSplitIds());
+				splitIter.dispose();
 				// Prepare the return status based on the availability of the next element.
 				status = finishedOrAvailableLater();
 			} else {
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
index d7b7b76..c83bec0 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
@@ -28,8 +28,8 @@ import java.util.Set;
  * A class that wraps around a {@link RecordsWithSplitIds} and provide a consistent iterator.
  */
 public class SplitsRecordIterator<E> {
-	private final Map<String, Collection<E>> recordsBySplits;
-	private final Set<String> finishedSplitIds;
+
+	private final RecordsWithSplitIds<E> recordsWithSplitIds;
 	private final Iterator<Map.Entry<String, Collection<E>>> splitIter;
 	private String currentSplitId;
 	private Iterator<E> recordsIter;
@@ -40,11 +40,11 @@ public class SplitsRecordIterator<E> {
 	 * @param recordsWithSplitIds the records by splits.
 	 */
 	public SplitsRecordIterator(RecordsWithSplitIds<E> recordsWithSplitIds) {
-		this.recordsBySplits = recordsWithSplitIds.recordsBySplits();
+		this.recordsWithSplitIds = recordsWithSplitIds;
+		Map<String, Collection<E>> recordsBySplits = recordsWithSplitIds.recordsBySplits();
 		// Remove empty splits;
 		recordsBySplits.entrySet().removeIf(e -> e.getValue().isEmpty());
 		this.splitIter = recordsBySplits.entrySet().iterator();
-		this.finishedSplitIds = recordsWithSplitIds.finishedSplits();
 	}
 
 	/**
@@ -91,6 +91,10 @@ public class SplitsRecordIterator<E> {
 	 * @return a set of finished split Ids.
 	 */
 	public Set<String> finishedSplitIds() {
-		return finishedSplitIds;
+		return recordsWithSplitIds.finishedSplits();
+	}
+
+	public void dispose() {
+		recordsWithSplitIds.recycle();
 	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index a332efe..0ec4297 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -19,10 +19,17 @@
 package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
 import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
+import org.apache.flink.connector.base.source.reader.mocks.PassThroughRecordEmitter;
+import org.apache.flink.connector.base.source.reader.mocks.TestingReaderContext;
+import org.apache.flink.connector.base.source.reader.mocks.TestingReaderOutput;
+import org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
@@ -33,10 +40,14 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * A unit test class for {@link SourceReaderBase}.
  */
@@ -89,6 +100,29 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 		}
 	}
 
+	@Test
+	public void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception {
+		final TestingRecordsWithSplitIds<String> records = new TestingRecordsWithSplitIds<>("test-split", "value1", "value2");
+		final SourceReader<?, ?> reader = createReaderAndAwaitAvailable("test-split", records);
+
+		reader.pollNext(new TestingReaderOutput<>());
+
+		assertFalse(records.isRecycled());
+	}
+
+	@Test
+	public void testRecordsWithSplitsRecycledWhenEmpty() throws Exception {
+		final TestingRecordsWithSplitIds<String> records = new TestingRecordsWithSplitIds<>("test-split", "value1", "value2");
+		final SourceReader<?, ?> reader = createReaderAndAwaitAvailable("test-split", records);
+
+		// poll thrice: twice to get all records, one more to trigger recycle and moving to the next split
+		reader.pollNext(new TestingReaderOutput<>());
+		reader.pollNext(new TestingReaderOutput<>());
+		reader.pollNext(new TestingReaderOutput<>());
+
+		assertTrue(records.isRecycled());
+	}
+
 	// ---------------- helper methods -----------------
 
 	@Override
@@ -140,4 +174,49 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 		config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L);
 		return config;
 	}
+
+	// ------------------------------------------------------------------------
+	//  Testing Setup Helpers
+	// ------------------------------------------------------------------------
+
+	private static <E> SourceReader<E, ?> createReaderAndAwaitAvailable(
+		final String splitId,
+		final RecordsWithSplitIds<E> records) throws Exception {
+
+		final FutureNotifier futureNotifier = new FutureNotifier();
+		final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue =
+			new FutureCompletingBlockingQueue<>(futureNotifier);
+
+		final SourceReader<E, TestingSourceSplit> reader = new SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, TestingSourceSplit>(
+			futureNotifier,
+			elementsQueue,
+			() -> new TestingSplitReader<E, TestingSourceSplit>(records),
+			new PassThroughRecordEmitter<E, TestingSourceSplit>(),
+			new Configuration(),
+			new TestingReaderContext()) {
+
+			@Override
+			protected void onSplitFinished(Collection<String> finishedSplitIds) {
+			}
+
+			@Override
+			protected TestingSourceSplit initializedState(TestingSourceSplit split) {
+				return split;
+			}
+
+			@Override
+			protected TestingSourceSplit toSplitType(String splitId, TestingSourceSplit splitState) {
+				return splitState;
+			}
+		};
+
+		reader.start();
+
+		final List<TestingSourceSplit> splits = Collections.singletonList(new TestingSourceSplit(splitId));
+		reader.addSplits(splits);
+
+		reader.isAvailable().get();
+
+		return reader;
+	}
 }


[flink] 10/10: [FLINK-18680][connectors] Make connector base RecordsWithSplitIds more lightweight.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f42a3ebc3e81a034b7221a803c153636fef34903
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 7 00:20:38 2020 +0200

    [FLINK-18680][connectors] Make connector base RecordsWithSplitIds more lightweight.
    
    This turns the RecordsWithSplitIds structure from a holder of materialized collections to
    a simple iterator-like structure to allow for lazy materialization and object reuse.
---
 .../base/source/reader/RecordsBySplits.java        | 179 +++++++++++++--------
 .../base/source/reader/RecordsWithSplitIds.java    |  20 +--
 .../base/source/reader/SourceReaderBase.java       | 117 +++++++++-----
 .../base/source/reader/SplitsRecordIterator.java   | 100 ------------
 .../source/reader/fetcher/SplitFetcherTest.java    |  10 +-
 .../base/source/reader/mocks/MockSplitReader.java  |   5 +-
 .../reader/mocks/TestingRecordsWithSplitIds.java   |  32 +---
 7 files changed, 212 insertions(+), 251 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
index 77cb594..0b15432 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
@@ -20,83 +20,59 @@ package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.connector.source.SourceSplit;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * An implementation of RecordsWithSplitIds to host all the records by splits.
  */
 public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
-	private Map<String, Collection<E>> recordsBySplits = new LinkedHashMap<>();
-	private Set<String> finishedSplits = new HashSet<>();
 
-	/**
-	 * Add the record from the given split ID.
-	 *
-	 * @param splitId the split ID the record was from.
-	 * @param record the record to add.
-	 */
-	public void add(String splitId, E record) {
-		recordsBySplits.computeIfAbsent(splitId, sid -> new ArrayList<>()).add(record);
-	}
+	private final Set<String> finishedSplits;
 
-	/**
-	 * Add the record from the given source split.
-	 *
-	 * @param split the source split the record was from.
-	 * @param record the record to add.
-	 */
-	public void add(SourceSplit split, E record) {
-		add(split.splitId(), record);
-	}
+	private final Iterator<Map.Entry<String, Collection<E>>> splitsIterator;
 
-	/**
-	 * Add multiple records from the given split ID.
-	 *
-	 * @param splitId the split ID given the records were from.
-	 * @param records the records to add.
-	 */
-	public void addAll(String splitId, Collection<E> records) {
-		this.recordsBySplits.compute(splitId, (id, r) -> {
-			if (r == null) {
-				r = records;
-			} else {
-				r.addAll(records);
-			}
-			return r;
-		});
-	}
+	@Nullable
+	private Iterator<E> recordsInCurrentSplit;
 
-	/**
-	 * Add multiple records from the given source split.
-	 *
-	 * @param split the source split the records were from.
-	 * @param records the records to add.
-	 */
-	public void addAll(SourceSplit split, Collection<E> records) {
-		addAll(split.splitId(), records);
+	public RecordsBySplits(
+			final Map<String, Collection<E>> recordsBySplit,
+			final Set<String> finishedSplits) {
+
+		this.splitsIterator = checkNotNull(recordsBySplit, "recordsBySplit").entrySet().iterator();
+		this.finishedSplits = checkNotNull(finishedSplits, "finishedSplits");
 	}
 
-	/**
-	 * Mark the split with the given ID as finished.
-	 *
-	 * @param splitId the ID of the finished split.
-	 */
-	public void addFinishedSplit(String splitId) {
-		finishedSplits.add(splitId);
+	@Nullable
+	@Override
+	public String nextSplit() {
+		if (splitsIterator.hasNext()) {
+			final Map.Entry<String, Collection<E>> next = splitsIterator.next();
+			recordsInCurrentSplit = next.getValue().iterator();
+			return next.getKey();
+		} else {
+			return null;
+		}
 	}
 
-	/**
-	 * Mark multiple splits with the given IDs as finished.
-	 *
-	 * @param splitIds the IDs of the finished splits.
-	 */
-	public void addFinishedSplits(Collection<String> splitIds) {
-		finishedSplits.addAll(splitIds);
+	@Nullable
+	@Override
+	public E nextRecordFromSplit() {
+		if (recordsInCurrentSplit == null) {
+			throw new IllegalStateException();
+		}
+
+		return recordsInCurrentSplit.hasNext() ? recordsInCurrentSplit.next() : null;
 	}
 
 	@Override
@@ -104,13 +80,86 @@ public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
 		return finishedSplits;
 	}
 
-	@Override
-	public Collection<String> splitIds() {
-		return recordsBySplits.keySet();
-	}
+	// ------------------------------------------------------------------------
 
-	@Override
-	public Map<String, Collection<E>> recordsBySplits() {
-		return recordsBySplits;
+	/**
+	 * A utility builder to collect records in individual calls, rather than put a finished collection
+	 * in the {@link RecordsBySplits#RecordsBySplits(Map, Set)} constructor.
+	 */
+	public static class Builder<E> {
+
+		private final Map<String, Collection<E>> recordsBySplits = new LinkedHashMap<>();
+		private final Set<String> finishedSplits = new HashSet<>(2);
+
+		/**
+		 * Add the record from the given split ID.
+		 *
+		 * @param splitId the split ID the record was from.
+		 * @param record the record to add.
+		 */
+		public void add(String splitId, E record) {
+			recordsBySplits.computeIfAbsent(splitId, sid -> new ArrayList<>()).add(record);
+		}
+
+		/**
+		 * Add the record from the given source split.
+		 *
+		 * @param split the source split the record was from.
+		 * @param record the record to add.
+		 */
+		public void add(SourceSplit split, E record) {
+			add(split.splitId(), record);
+		}
+
+		/**
+		 * Add multiple records from the given split ID.
+		 *
+		 * @param splitId the split ID given the records were from.
+		 * @param records the records to add.
+		 */
+		public void addAll(String splitId, Collection<E> records) {
+			this.recordsBySplits.compute(splitId, (id, r) -> {
+				if (r == null) {
+					r = records;
+				} else {
+					r.addAll(records);
+				}
+				return r;
+			});
+		}
+
+		/**
+		 * Add multiple records from the given source split.
+		 *
+		 * @param split the source split the records were from.
+		 * @param records the records to add.
+		 */
+		public void addAll(SourceSplit split, Collection<E> records) {
+			addAll(split.splitId(), records);
+		}
+
+		/**
+		 * Mark the split with the given ID as finished.
+		 *
+		 * @param splitId the ID of the finished split.
+		 */
+		public void addFinishedSplit(String splitId) {
+			finishedSplits.add(splitId);
+		}
+
+		/**
+		 * Mark multiple splits with the given IDs as finished.
+		 *
+		 * @param splitIds the IDs of the finished splits.
+		 */
+		public void addFinishedSplits(Collection<String> splitIds) {
+			finishedSplits.addAll(splitIds);
+		}
+
+		public RecordsBySplits<E> build() {
+			return new RecordsBySplits<>(
+				recordsBySplits.isEmpty() ? Collections.emptyMap() : recordsBySplits,
+				finishedSplits.isEmpty() ? Collections.emptySet() : finishedSplits);
+		}
 	}
 }
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
index dc915b3..0c8fd07 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.base.source.reader;
 
-import java.util.Collection;
-import java.util.Map;
+import javax.annotation.Nullable;
+
 import java.util.Set;
 
 /**
@@ -28,18 +28,18 @@ import java.util.Set;
 public interface RecordsWithSplitIds<E> {
 
 	/**
-	 * Get all the split ids.
-	 *
-	 * @return a collection of split ids.
+	 * Moves to the next split. This method is also called initially to move to the
+	 * first split. Returns null, if no splits are left.
 	 */
-	Collection<String> splitIds();
+	@Nullable
+	String nextSplit();
 
 	/**
-	 * Get all the records by Splits.
-	 *
-	 * @return a mapping from split ids to the records.
+	 * Gets the next record from the current split. Returns null if no more records are left
+	 * in this split.
 	 */
-	Map<String, Collection<E>> recordsBySplits();
+	@Nullable
+	E nextRecordFromSplit();
 
 	/**
 	 * Get the finished splits.
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 4a41d49..02b7a7c 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -35,14 +35,19 @@ import org.apache.flink.core.io.InputStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * An abstract implementation of {@link SourceReader} which provides some sychronization between
  * the mail box main thread and the SourceReader internal threads. This class allows user to
@@ -81,8 +86,10 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	/** The context of this source reader. */
 	protected SourceReaderContext context;
 
-	/** The last element to ensure it is fully handled. */
-	private SplitsRecordIterator<E> splitIter;
+	/** The latest fetched batch of records-by-split from the split reader. */
+	@Nullable private RecordsWithSplitIds<E> currentFetch;
+	@Nullable private SplitContext<T, SplitStateT> currentSplitContext;
+	@Nullable private SourceOutput<T> currentSplitOutput;
 
 	/** Indicating whether the SourceReader will be assigned more splits or not.*/
 	private boolean noMoreSplitsAssignment;
@@ -99,7 +106,6 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 		this.splitFetcherManager = splitFetcherManager;
 		this.recordEmitter = recordEmitter;
 		this.splitStates = new HashMap<>();
-		this.splitIter = null;
 		this.options = new SourceReaderOptions(config);
 		this.config = config;
 		this.context = context;
@@ -107,60 +113,87 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	}
 
 	@Override
-	public void start() {
-
-	}
+	public void start() {}
 
 	@Override
 	public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
 		splitFetcherManager.checkErrors();
-		// poll from the queue if the last element was successfully handled. Otherwise
-		// just pass the last element again.
-		RecordsWithSplitIds<E> recordsWithSplitId = null;
-		boolean newFetch = splitIter == null || !splitIter.hasNext();
-		if (newFetch) {
-			recordsWithSplitId = elementsQueue.poll();
+
+		// make sure we have a fetch we are working on, or move to the next
+		final RecordsWithSplitIds<E> recordsWithSplitId = getCurrentOrNewFetch(output);
+		if (recordsWithSplitId == null) {
+			return trace(finishedOrAvailableLater());
 		}
 
-		InputStatus status;
-		if (newFetch && recordsWithSplitId == null) {
-			// No element available, set to available later if needed.
-			status = finishedOrAvailableLater();
-		} else {
-			// Update the record iterator if it is a new fetch.
-			if (newFetch) {
-				splitIter = new SplitsRecordIterator<>(recordsWithSplitId);
-			}
+		// we need to loop here, because we may have to go across splits
+		while (true) {
 			// Process one record.
-			if (splitIter.hasNext()) {
+			final E record = recordsWithSplitId.nextRecordFromSplit();
+			if (record != null) {
 				// emit the record.
-				final E record = splitIter.next();
-				final SplitContext<T, SplitStateT> splitContext = splitStates.get(splitIter.currentSplitId());
-				final SourceOutput<T> splitOutput = splitContext.getOrCreateSplitOutput(output);
-				recordEmitter.emitRecord(record, splitOutput, splitContext.state);
+				recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
 				LOG.trace("Emitted record: {}", record);
+				return trace(InputStatus.MORE_AVAILABLE);
 			}
-			// Do some cleanup if the all the records in the current splitIter have been processed.
-			if (!splitIter.hasNext()) {
-				// First remove the state of the split.
-				splitIter.finishedSplitIds().forEach((id) -> {
-					splitStates.remove(id);
-					output.releaseOutputForSplit(id);
-				});
-				// Handle the finished splits.
-				onSplitFinished(splitIter.finishedSplitIds());
-				splitIter.dispose();
-				// Prepare the return status based on the availability of the next element.
-				status = finishedOrAvailableLater();
-			} else {
-				// There are more records from the current splitIter.
-				status = InputStatus.MORE_AVAILABLE;
+			else if (!moveToNextSplit(recordsWithSplitId, output)) {
+				return trace(finishedOrAvailableLater());
 			}
+			// else fall through the loop
 		}
+	}
+
+	private InputStatus trace(InputStatus status) {
 		LOG.trace("Source reader status: {}", status);
 		return status;
 	}
 
+	@Nullable
+	private RecordsWithSplitIds<E> getCurrentOrNewFetch(final ReaderOutput<T> output) {
+		RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
+		if (recordsWithSplitId != null) {
+			return recordsWithSplitId;
+		}
+
+		recordsWithSplitId = elementsQueue.poll();
+		if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {
+			// No element available, set to available later if needed.
+			return null;
+		}
+
+		currentFetch = recordsWithSplitId;
+		return recordsWithSplitId;
+	}
+
+	private void finishCurrentFetch(final RecordsWithSplitIds<E> fetch, final ReaderOutput<T> output) {
+		currentFetch = null;
+		currentSplitContext = null;
+		currentSplitOutput = null;
+
+		final Set<String> finishedSplits = fetch.finishedSplits();
+		if (!finishedSplits.isEmpty()) {
+			for (String finishedSplitId : finishedSplits) {
+				splitStates.remove(finishedSplitId);
+				output.releaseOutputForSplit(finishedSplitId);
+			}
+			onSplitFinished(finishedSplits);
+		}
+
+		fetch.recycle();
+	}
+
+	private boolean moveToNextSplit(RecordsWithSplitIds<E> recordsWithSplitIds, ReaderOutput<T> output) {
+		final String nextSplitId = recordsWithSplitIds.nextSplit();
+		if (nextSplitId == null) {
+			finishCurrentFetch(recordsWithSplitIds, output);
+			return false;
+		}
+
+		currentSplitContext = splitStates.get(nextSplitId);
+		checkState(currentSplitContext != null, "Have records for a split that was not registered");
+		currentSplitOutput = currentSplitContext.getOrCreateSplitOutput(output);
+		return true;
+	}
+
 	@Override
 	public CompletableFuture<Void> isAvailable() {
 		// The order matters here. We first get the future. After this point, if the queue
@@ -235,7 +268,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 
 	private InputStatus finishedOrAvailableLater() {
 		boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
-		boolean allElementsEmitted = elementsQueue.isEmpty() && (splitIter == null || !splitIter.hasNext());
+		boolean allElementsEmitted = elementsQueue.isEmpty();
 		if (noMoreSplitsAssignment && allFetchersHaveShutdown && allElementsEmitted) {
 			return InputStatus.END_OF_INPUT;
 		} else {
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
deleted file mode 100644
index c83bec0..0000000
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.source.reader;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-/**
- * A class that wraps around a {@link RecordsWithSplitIds} and provide a consistent iterator.
- */
-public class SplitsRecordIterator<E> {
-
-	private final RecordsWithSplitIds<E> recordsWithSplitIds;
-	private final Iterator<Map.Entry<String, Collection<E>>> splitIter;
-	private String currentSplitId;
-	private Iterator<E> recordsIter;
-
-	/**
-	 * Construct a cross-splits iterator for the records.
-	 *
-	 * @param recordsWithSplitIds the records by splits.
-	 */
-	public SplitsRecordIterator(RecordsWithSplitIds<E> recordsWithSplitIds) {
-		this.recordsWithSplitIds = recordsWithSplitIds;
-		Map<String, Collection<E>> recordsBySplits = recordsWithSplitIds.recordsBySplits();
-		// Remove empty splits;
-		recordsBySplits.entrySet().removeIf(e -> e.getValue().isEmpty());
-		this.splitIter = recordsBySplits.entrySet().iterator();
-	}
-
-	/**
-	 * Whether their are more records available.
-	 *
-	 * @return true if there are more records, false otherwise.
-	 */
-	public boolean hasNext() {
-		if (recordsIter == null || !recordsIter.hasNext()) {
-			if (splitIter.hasNext()) {
-				Map.Entry<String, Collection<E>> entry = splitIter.next();
-				currentSplitId = entry.getKey();
-				recordsIter = entry.getValue().iterator();
-			} else {
-				return false;
-			}
-		}
-		return recordsIter.hasNext() || splitIter.hasNext();
-	}
-
-	/**
-	 * Get the next record.
-	 * @return the next record.
-	 */
-	public E next() {
-		if (!hasNext()) {
-			throw new NoSuchElementException();
-		}
-		return recordsIter.next();
-	}
-
-	/**
-	 * Get the split id of the last returned record.
-	 *
-	 * @return the split id of the last returned record.
-	 */
-	public String currentSplitId() {
-		return currentSplitId;
-	}
-
-	/**
-	 * The split Ids that are finished after all the records in this iterator are emitted.
-	 *
-	 * @return a set of finished split Ids.
-	 */
-	public Set<String> finishedSplitIds() {
-		return recordsWithSplitIds.finishedSplits();
-	}
-
-	public void dispose() {
-		recordsWithSplitIds.recycle();
-	}
-}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 953d7aa..e9c2ad2 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -95,9 +95,13 @@ public class SplitFetcherTest {
 			interrupter.start();
 
 			while (recordsRead.size() < NUM_SPLITS * NUM_RECORDS_PER_SPLIT) {
-				elementQueue.take().recordsBySplits().values().forEach(records ->
-						// Ensure there is no duplicate records.
-						records.forEach(arr -> assertTrue(recordsRead.add(arr[0]))));
+				final RecordsWithSplitIds<int[]> nextBatch = elementQueue.take();
+				while (nextBatch.nextSplit() != null) {
+					int[] arr;
+					while ((arr = nextBatch.nextRecordFromSplit()) != null) {
+						assertTrue(recordsRead.add(arr[0]));
+					}
+				}
 			}
 
 			assertEquals(NUM_TOTAL_RECORDS, recordsRead.size());
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
index 3c6d8df..00d4d71 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
@@ -81,7 +81,8 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 	}
 
 	private RecordsBySplits<int[]> getRecords() {
-		RecordsBySplits<int[]> records = new RecordsBySplits<>();
+		final RecordsBySplits.Builder<int[]> records = new RecordsBySplits.Builder<>();
+
 		try {
 			for (Map.Entry<String, MockSourceSplit> entry : splits.entrySet()) {
 				MockSourceSplit split = entry.getValue();
@@ -102,6 +103,6 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 				throw new RuntimeException("Caught unexpected interrupted exception.");
 			}
 		}
-		return records;
+		return records.build();
 	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
index 2a8377a..3aa49ac 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
@@ -18,48 +18,22 @@
 
 package org.apache.flink.connector.base.source.reader.mocks;
 
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A mock implementation of {@link RecordsWithSplitIds} that returns a given set of records.
  */
-public class TestingRecordsWithSplitIds<E> implements RecordsWithSplitIds<E> {
-
-	private final Map<String, Collection<E>> records;
-
-	private final String splitId;
+public class TestingRecordsWithSplitIds<E> extends RecordsBySplits<E> {
 
 	private volatile boolean isRecycled;
 
 	@SafeVarargs
 	public TestingRecordsWithSplitIds(String splitId, E... records) {
-		this.splitId = checkNotNull(splitId);
-		this.records = new HashMap<>();
-		this.records.put(splitId, Arrays.asList(records));
-	}
-
-	@Override
-	public Collection<String> splitIds() {
-		return Collections.singleton(splitId);
-	}
-
-	@Override
-	public Map<String, Collection<E>> recordsBySplits() {
-		return records;
-	}
-
-	@Override
-	public Set<String> finishedSplits() {
-		return Collections.emptySet();
+		super(Collections.singletonMap(splitId, Arrays.asList(records)), Collections.emptySet());
 	}
 
 	@Override