You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/11 16:33:40 UTC

[flink] 10/10: [FLINK-18680][connectors] Make connector base RecordsWithSplitIds more lightweight.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f42a3ebc3e81a034b7221a803c153636fef34903
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 7 00:20:38 2020 +0200

    [FLINK-18680][connectors] Make connector base RecordsWithSplitIds more lightweight.
    
    This turns the RecordsWithSplitIds structure from a holder of materialized collections to
    a simple iterator-like structure to allow for lazy materialization and object reuse.
---
 .../base/source/reader/RecordsBySplits.java        | 179 +++++++++++++--------
 .../base/source/reader/RecordsWithSplitIds.java    |  20 +--
 .../base/source/reader/SourceReaderBase.java       | 117 +++++++++-----
 .../base/source/reader/SplitsRecordIterator.java   | 100 ------------
 .../source/reader/fetcher/SplitFetcherTest.java    |  10 +-
 .../base/source/reader/mocks/MockSplitReader.java  |   5 +-
 .../reader/mocks/TestingRecordsWithSplitIds.java   |  32 +---
 7 files changed, 212 insertions(+), 251 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
index 77cb594..0b15432 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.java
@@ -20,83 +20,59 @@ package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.connector.source.SourceSplit;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * An implementation of RecordsWithSplitIds to host all the records by splits.
  */
 public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
-	private Map<String, Collection<E>> recordsBySplits = new LinkedHashMap<>();
-	private Set<String> finishedSplits = new HashSet<>();
 
-	/**
-	 * Add the record from the given split ID.
-	 *
-	 * @param splitId the split ID the record was from.
-	 * @param record the record to add.
-	 */
-	public void add(String splitId, E record) {
-		recordsBySplits.computeIfAbsent(splitId, sid -> new ArrayList<>()).add(record);
-	}
+	private final Set<String> finishedSplits;
 
-	/**
-	 * Add the record from the given source split.
-	 *
-	 * @param split the source split the record was from.
-	 * @param record the record to add.
-	 */
-	public void add(SourceSplit split, E record) {
-		add(split.splitId(), record);
-	}
+	private final Iterator<Map.Entry<String, Collection<E>>> splitsIterator;
 
-	/**
-	 * Add multiple records from the given split ID.
-	 *
-	 * @param splitId the split ID given the records were from.
-	 * @param records the records to add.
-	 */
-	public void addAll(String splitId, Collection<E> records) {
-		this.recordsBySplits.compute(splitId, (id, r) -> {
-			if (r == null) {
-				r = records;
-			} else {
-				r.addAll(records);
-			}
-			return r;
-		});
-	}
+	@Nullable
+	private Iterator<E> recordsInCurrentSplit;
 
-	/**
-	 * Add multiple records from the given source split.
-	 *
-	 * @param split the source split the records were from.
-	 * @param records the records to add.
-	 */
-	public void addAll(SourceSplit split, Collection<E> records) {
-		addAll(split.splitId(), records);
+	public RecordsBySplits(
+			final Map<String, Collection<E>> recordsBySplit,
+			final Set<String> finishedSplits) {
+
+		this.splitsIterator = checkNotNull(recordsBySplit, "recordsBySplit").entrySet().iterator();
+		this.finishedSplits = checkNotNull(finishedSplits, "finishedSplits");
 	}
 
-	/**
-	 * Mark the split with the given ID as finished.
-	 *
-	 * @param splitId the ID of the finished split.
-	 */
-	public void addFinishedSplit(String splitId) {
-		finishedSplits.add(splitId);
+	@Nullable
+	@Override
+	public String nextSplit() {
+		if (splitsIterator.hasNext()) {
+			final Map.Entry<String, Collection<E>> next = splitsIterator.next();
+			recordsInCurrentSplit = next.getValue().iterator();
+			return next.getKey();
+		} else {
+			return null;
+		}
 	}
 
-	/**
-	 * Mark multiple splits with the given IDs as finished.
-	 *
-	 * @param splitIds the IDs of the finished splits.
-	 */
-	public void addFinishedSplits(Collection<String> splitIds) {
-		finishedSplits.addAll(splitIds);
+	@Nullable
+	@Override
+	public E nextRecordFromSplit() {
+		if (recordsInCurrentSplit == null) {
+			throw new IllegalStateException();
+		}
+
+		return recordsInCurrentSplit.hasNext() ? recordsInCurrentSplit.next() : null;
 	}
 
 	@Override
@@ -104,13 +80,86 @@ public class RecordsBySplits<E> implements RecordsWithSplitIds<E> {
 		return finishedSplits;
 	}
 
-	@Override
-	public Collection<String> splitIds() {
-		return recordsBySplits.keySet();
-	}
+	// ------------------------------------------------------------------------
 
-	@Override
-	public Map<String, Collection<E>> recordsBySplits() {
-		return recordsBySplits;
+	/**
+	 * A utility builder to collect records in individual calls, rather than put a finished collection
+	 * in the {@link RecordsBySplits#RecordsBySplits(Map, Set)} constructor.
+	 */
+	public static class Builder<E> {
+
+		private final Map<String, Collection<E>> recordsBySplits = new LinkedHashMap<>();
+		private final Set<String> finishedSplits = new HashSet<>(2);
+
+		/**
+		 * Add the record from the given split ID.
+		 *
+		 * @param splitId the split ID the record was from.
+		 * @param record the record to add.
+		 */
+		public void add(String splitId, E record) {
+			recordsBySplits.computeIfAbsent(splitId, sid -> new ArrayList<>()).add(record);
+		}
+
+		/**
+		 * Add the record from the given source split.
+		 *
+		 * @param split the source split the record was from.
+		 * @param record the record to add.
+		 */
+		public void add(SourceSplit split, E record) {
+			add(split.splitId(), record);
+		}
+
+		/**
+		 * Add multiple records from the given split ID.
+		 *
+		 * @param splitId the split ID given the records were from.
+		 * @param records the records to add.
+		 */
+		public void addAll(String splitId, Collection<E> records) {
+			this.recordsBySplits.compute(splitId, (id, r) -> {
+				if (r == null) {
+					r = records;
+				} else {
+					r.addAll(records);
+				}
+				return r;
+			});
+		}
+
+		/**
+		 * Add multiple records from the given source split.
+		 *
+		 * @param split the source split the records were from.
+		 * @param records the records to add.
+		 */
+		public void addAll(SourceSplit split, Collection<E> records) {
+			addAll(split.splitId(), records);
+		}
+
+		/**
+		 * Mark the split with the given ID as finished.
+		 *
+		 * @param splitId the ID of the finished split.
+		 */
+		public void addFinishedSplit(String splitId) {
+			finishedSplits.add(splitId);
+		}
+
+		/**
+		 * Mark multiple splits with the given IDs as finished.
+		 *
+		 * @param splitIds the IDs of the finished splits.
+		 */
+		public void addFinishedSplits(Collection<String> splitIds) {
+			finishedSplits.addAll(splitIds);
+		}
+
+		public RecordsBySplits<E> build() {
+			return new RecordsBySplits<>(
+				recordsBySplits.isEmpty() ? Collections.emptyMap() : recordsBySplits,
+				finishedSplits.isEmpty() ? Collections.emptySet() : finishedSplits);
+		}
 	}
 }
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
index dc915b3..0c8fd07 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.base.source.reader;
 
-import java.util.Collection;
-import java.util.Map;
+import javax.annotation.Nullable;
+
 import java.util.Set;
 
 /**
@@ -28,18 +28,18 @@ import java.util.Set;
 public interface RecordsWithSplitIds<E> {
 
 	/**
-	 * Get all the split ids.
-	 *
-	 * @return a collection of split ids.
+	 * Moves to the next split. This method is also called initially to move to the
+	 * first split. Returns null, if no splits are left.
 	 */
-	Collection<String> splitIds();
+	@Nullable
+	String nextSplit();
 
 	/**
-	 * Get all the records by Splits.
-	 *
-	 * @return a mapping from split ids to the records.
+	 * Gets the next record from the current split. Returns null if no more records are left
+	 * in this split.
 	 */
-	Map<String, Collection<E>> recordsBySplits();
+	@Nullable
+	E nextRecordFromSplit();
 
 	/**
 	 * Get the finished splits.
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 4a41d49..02b7a7c 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -35,14 +35,19 @@ import org.apache.flink.core.io.InputStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /**
  * An abstract implementation of {@link SourceReader} which provides some sychronization between
  * the mail box main thread and the SourceReader internal threads. This class allows user to
@@ -81,8 +86,10 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	/** The context of this source reader. */
 	protected SourceReaderContext context;
 
-	/** The last element to ensure it is fully handled. */
-	private SplitsRecordIterator<E> splitIter;
+	/** The latest fetched batch of records-by-split from the split reader. */
+	@Nullable private RecordsWithSplitIds<E> currentFetch;
+	@Nullable private SplitContext<T, SplitStateT> currentSplitContext;
+	@Nullable private SourceOutput<T> currentSplitOutput;
 
 	/** Indicating whether the SourceReader will be assigned more splits or not.*/
 	private boolean noMoreSplitsAssignment;
@@ -99,7 +106,6 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 		this.splitFetcherManager = splitFetcherManager;
 		this.recordEmitter = recordEmitter;
 		this.splitStates = new HashMap<>();
-		this.splitIter = null;
 		this.options = new SourceReaderOptions(config);
 		this.config = config;
 		this.context = context;
@@ -107,60 +113,87 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	}
 
 	@Override
-	public void start() {
-
-	}
+	public void start() {}
 
 	@Override
 	public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
 		splitFetcherManager.checkErrors();
-		// poll from the queue if the last element was successfully handled. Otherwise
-		// just pass the last element again.
-		RecordsWithSplitIds<E> recordsWithSplitId = null;
-		boolean newFetch = splitIter == null || !splitIter.hasNext();
-		if (newFetch) {
-			recordsWithSplitId = elementsQueue.poll();
+
+		// make sure we have a fetch we are working on, or move to the next
+		final RecordsWithSplitIds<E> recordsWithSplitId = getCurrentOrNewFetch(output);
+		if (recordsWithSplitId == null) {
+			return trace(finishedOrAvailableLater());
 		}
 
-		InputStatus status;
-		if (newFetch && recordsWithSplitId == null) {
-			// No element available, set to available later if needed.
-			status = finishedOrAvailableLater();
-		} else {
-			// Update the record iterator if it is a new fetch.
-			if (newFetch) {
-				splitIter = new SplitsRecordIterator<>(recordsWithSplitId);
-			}
+		// we need to loop here, because we may have to go across splits
+		while (true) {
 			// Process one record.
-			if (splitIter.hasNext()) {
+			final E record = recordsWithSplitId.nextRecordFromSplit();
+			if (record != null) {
 				// emit the record.
-				final E record = splitIter.next();
-				final SplitContext<T, SplitStateT> splitContext = splitStates.get(splitIter.currentSplitId());
-				final SourceOutput<T> splitOutput = splitContext.getOrCreateSplitOutput(output);
-				recordEmitter.emitRecord(record, splitOutput, splitContext.state);
+				recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
 				LOG.trace("Emitted record: {}", record);
+				return trace(InputStatus.MORE_AVAILABLE);
 			}
-			// Do some cleanup if the all the records in the current splitIter have been processed.
-			if (!splitIter.hasNext()) {
-				// First remove the state of the split.
-				splitIter.finishedSplitIds().forEach((id) -> {
-					splitStates.remove(id);
-					output.releaseOutputForSplit(id);
-				});
-				// Handle the finished splits.
-				onSplitFinished(splitIter.finishedSplitIds());
-				splitIter.dispose();
-				// Prepare the return status based on the availability of the next element.
-				status = finishedOrAvailableLater();
-			} else {
-				// There are more records from the current splitIter.
-				status = InputStatus.MORE_AVAILABLE;
+			else if (!moveToNextSplit(recordsWithSplitId, output)) {
+				return trace(finishedOrAvailableLater());
 			}
+			// else fall through the loop
 		}
+	}
+
+	private InputStatus trace(InputStatus status) {
 		LOG.trace("Source reader status: {}", status);
 		return status;
 	}
 
+	@Nullable
+	private RecordsWithSplitIds<E> getCurrentOrNewFetch(final ReaderOutput<T> output) {
+		RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
+		if (recordsWithSplitId != null) {
+			return recordsWithSplitId;
+		}
+
+		recordsWithSplitId = elementsQueue.poll();
+		if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {
+			// No element available, set to available later if needed.
+			return null;
+		}
+
+		currentFetch = recordsWithSplitId;
+		return recordsWithSplitId;
+	}
+
+	private void finishCurrentFetch(final RecordsWithSplitIds<E> fetch, final ReaderOutput<T> output) {
+		currentFetch = null;
+		currentSplitContext = null;
+		currentSplitOutput = null;
+
+		final Set<String> finishedSplits = fetch.finishedSplits();
+		if (!finishedSplits.isEmpty()) {
+			for (String finishedSplitId : finishedSplits) {
+				splitStates.remove(finishedSplitId);
+				output.releaseOutputForSplit(finishedSplitId);
+			}
+			onSplitFinished(finishedSplits);
+		}
+
+		fetch.recycle();
+	}
+
+	private boolean moveToNextSplit(RecordsWithSplitIds<E> recordsWithSplitIds, ReaderOutput<T> output) {
+		final String nextSplitId = recordsWithSplitIds.nextSplit();
+		if (nextSplitId == null) {
+			finishCurrentFetch(recordsWithSplitIds, output);
+			return false;
+		}
+
+		currentSplitContext = splitStates.get(nextSplitId);
+		checkState(currentSplitContext != null, "Have records for a split that was not registered");
+		currentSplitOutput = currentSplitContext.getOrCreateSplitOutput(output);
+		return true;
+	}
+
 	@Override
 	public CompletableFuture<Void> isAvailable() {
 		// The order matters here. We first get the future. After this point, if the queue
@@ -235,7 +268,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 
 	private InputStatus finishedOrAvailableLater() {
 		boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
-		boolean allElementsEmitted = elementsQueue.isEmpty() && (splitIter == null || !splitIter.hasNext());
+		boolean allElementsEmitted = elementsQueue.isEmpty();
 		if (noMoreSplitsAssignment && allFetchersHaveShutdown && allElementsEmitted) {
 			return InputStatus.END_OF_INPUT;
 		} else {
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
deleted file mode 100644
index c83bec0..0000000
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.source.reader;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-/**
- * A class that wraps around a {@link RecordsWithSplitIds} and provide a consistent iterator.
- */
-public class SplitsRecordIterator<E> {
-
-	private final RecordsWithSplitIds<E> recordsWithSplitIds;
-	private final Iterator<Map.Entry<String, Collection<E>>> splitIter;
-	private String currentSplitId;
-	private Iterator<E> recordsIter;
-
-	/**
-	 * Construct a cross-splits iterator for the records.
-	 *
-	 * @param recordsWithSplitIds the records by splits.
-	 */
-	public SplitsRecordIterator(RecordsWithSplitIds<E> recordsWithSplitIds) {
-		this.recordsWithSplitIds = recordsWithSplitIds;
-		Map<String, Collection<E>> recordsBySplits = recordsWithSplitIds.recordsBySplits();
-		// Remove empty splits;
-		recordsBySplits.entrySet().removeIf(e -> e.getValue().isEmpty());
-		this.splitIter = recordsBySplits.entrySet().iterator();
-	}
-
-	/**
-	 * Whether their are more records available.
-	 *
-	 * @return true if there are more records, false otherwise.
-	 */
-	public boolean hasNext() {
-		if (recordsIter == null || !recordsIter.hasNext()) {
-			if (splitIter.hasNext()) {
-				Map.Entry<String, Collection<E>> entry = splitIter.next();
-				currentSplitId = entry.getKey();
-				recordsIter = entry.getValue().iterator();
-			} else {
-				return false;
-			}
-		}
-		return recordsIter.hasNext() || splitIter.hasNext();
-	}
-
-	/**
-	 * Get the next record.
-	 * @return the next record.
-	 */
-	public E next() {
-		if (!hasNext()) {
-			throw new NoSuchElementException();
-		}
-		return recordsIter.next();
-	}
-
-	/**
-	 * Get the split id of the last returned record.
-	 *
-	 * @return the split id of the last returned record.
-	 */
-	public String currentSplitId() {
-		return currentSplitId;
-	}
-
-	/**
-	 * The split Ids that are finished after all the records in this iterator are emitted.
-	 *
-	 * @return a set of finished split Ids.
-	 */
-	public Set<String> finishedSplitIds() {
-		return recordsWithSplitIds.finishedSplits();
-	}
-
-	public void dispose() {
-		recordsWithSplitIds.recycle();
-	}
-}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 953d7aa..e9c2ad2 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -95,9 +95,13 @@ public class SplitFetcherTest {
 			interrupter.start();
 
 			while (recordsRead.size() < NUM_SPLITS * NUM_RECORDS_PER_SPLIT) {
-				elementQueue.take().recordsBySplits().values().forEach(records ->
-						// Ensure there is no duplicate records.
-						records.forEach(arr -> assertTrue(recordsRead.add(arr[0]))));
+				final RecordsWithSplitIds<int[]> nextBatch = elementQueue.take();
+				while (nextBatch.nextSplit() != null) {
+					int[] arr;
+					while ((arr = nextBatch.nextRecordFromSplit()) != null) {
+						assertTrue(recordsRead.add(arr[0]));
+					}
+				}
 			}
 
 			assertEquals(NUM_TOTAL_RECORDS, recordsRead.size());
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
index 3c6d8df..00d4d71 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
@@ -81,7 +81,8 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 	}
 
 	private RecordsBySplits<int[]> getRecords() {
-		RecordsBySplits<int[]> records = new RecordsBySplits<>();
+		final RecordsBySplits.Builder<int[]> records = new RecordsBySplits.Builder<>();
+
 		try {
 			for (Map.Entry<String, MockSourceSplit> entry : splits.entrySet()) {
 				MockSourceSplit split = entry.getValue();
@@ -102,6 +103,6 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 				throw new RuntimeException("Caught unexpected interrupted exception.");
 			}
 		}
-		return records;
+		return records.build();
 	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
index 2a8377a..3aa49ac 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingRecordsWithSplitIds.java
@@ -18,48 +18,22 @@
 
 package org.apache.flink.connector.base.source.reader.mocks;
 
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * A mock implementation of {@link RecordsWithSplitIds} that returns a given set of records.
  */
-public class TestingRecordsWithSplitIds<E> implements RecordsWithSplitIds<E> {
-
-	private final Map<String, Collection<E>> records;
-
-	private final String splitId;
+public class TestingRecordsWithSplitIds<E> extends RecordsBySplits<E> {
 
 	private volatile boolean isRecycled;
 
 	@SafeVarargs
 	public TestingRecordsWithSplitIds(String splitId, E... records) {
-		this.splitId = checkNotNull(splitId);
-		this.records = new HashMap<>();
-		this.records.put(splitId, Arrays.asList(records));
-	}
-
-	@Override
-	public Collection<String> splitIds() {
-		return Collections.singleton(splitId);
-	}
-
-	@Override
-	public Map<String, Collection<E>> recordsBySplits() {
-		return records;
-	}
-
-	@Override
-	public Set<String> finishedSplits() {
-		return Collections.emptySet();
+		super(Collections.singletonMap(splitId, Arrays.asList(records)), Collections.emptySet());
 	}
 
 	@Override