You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/11 16:33:37 UTC

[flink] 07/10: [FLINK-19162][connectors] Add 'recycle()' to the RecordsWithSplitIds to support reuse of heavy objects.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e3d273de822b085183d09b275a445879ff94b350
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 1 17:19:52 2020 +0200

    [FLINK-19162][connectors] Add 'recycle()' to the RecordsWithSplitIds to support reuse of heavy objects.
---
 .../base/source/reader/RecordsWithSplitIds.java    |  9 +++
 .../base/source/reader/SourceReaderBase.java       |  1 +
 .../base/source/reader/SplitsRecordIterator.java   | 14 ++--
 .../base/source/reader/SourceReaderBaseTest.java   | 79 ++++++++++++++++++++++
 4 files changed, 98 insertions(+), 5 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
index f616125..dc915b3 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/RecordsWithSplitIds.java
@@ -47,4 +47,13 @@ public interface RecordsWithSplitIds<E> {
 	 * @return the finished splits after this RecordsWithSplitIds is returned.
 	 */
 	Set<String> finishedSplits();
+
+	/**
+	 * This method is called when all records from this batch have been emitted.
+	 *
+	 * <p>Overriding this method gives implementations the opportunity to recycle/reuse this object,
+	 * which is a performance optimization that is important for cases where the record objects are
+	 * large or otherwise heavy to allocate.
+	 */
+	default void recycle() {}
 }
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index e01180e..4a41d49 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -149,6 +149,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 				});
 				// Handle the finished splits.
 				onSplitFinished(splitIter.finishedSplitIds());
+				splitIter.dispose();
 				// Prepare the return status based on the availability of the next element.
 				status = finishedOrAvailableLater();
 			} else {
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
index d7b7b76..c83bec0 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SplitsRecordIterator.java
@@ -28,8 +28,8 @@ import java.util.Set;
  * A class that wraps around a {@link RecordsWithSplitIds} and provide a consistent iterator.
  */
 public class SplitsRecordIterator<E> {
-	private final Map<String, Collection<E>> recordsBySplits;
-	private final Set<String> finishedSplitIds;
+
+	private final RecordsWithSplitIds<E> recordsWithSplitIds;
 	private final Iterator<Map.Entry<String, Collection<E>>> splitIter;
 	private String currentSplitId;
 	private Iterator<E> recordsIter;
@@ -40,11 +40,11 @@ public class SplitsRecordIterator<E> {
 	 * @param recordsWithSplitIds the records by splits.
 	 */
 	public SplitsRecordIterator(RecordsWithSplitIds<E> recordsWithSplitIds) {
-		this.recordsBySplits = recordsWithSplitIds.recordsBySplits();
+		this.recordsWithSplitIds = recordsWithSplitIds;
+		Map<String, Collection<E>> recordsBySplits = recordsWithSplitIds.recordsBySplits();
 		// Remove empty splits;
 		recordsBySplits.entrySet().removeIf(e -> e.getValue().isEmpty());
 		this.splitIter = recordsBySplits.entrySet().iterator();
-		this.finishedSplitIds = recordsWithSplitIds.finishedSplits();
 	}
 
 	/**
@@ -91,6 +91,10 @@ public class SplitsRecordIterator<E> {
 	 * @return a set of finished split Ids.
 	 */
 	public Set<String> finishedSplitIds() {
-		return finishedSplitIds;
+		return recordsWithSplitIds.finishedSplits();
+	}
+
+	public void dispose() {
+		recordsWithSplitIds.recycle();
 	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index a332efe..0ec4297 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -19,10 +19,17 @@
 package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
 import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
+import org.apache.flink.connector.base.source.reader.mocks.PassThroughRecordEmitter;
+import org.apache.flink.connector.base.source.reader.mocks.TestingReaderContext;
+import org.apache.flink.connector.base.source.reader.mocks.TestingReaderOutput;
+import org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
@@ -33,10 +40,14 @@ import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Queue;
 
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 /**
  * A unit test class for {@link SourceReaderBase}.
  */
@@ -89,6 +100,29 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 		}
 	}
 
+	@Test
+	public void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception {
+		final TestingRecordsWithSplitIds<String> records = new TestingRecordsWithSplitIds<>("test-split", "value1", "value2");
+		final SourceReader<?, ?> reader = createReaderAndAwaitAvailable("test-split", records);
+
+		reader.pollNext(new TestingReaderOutput<>());
+
+		assertFalse(records.isRecycled());
+	}
+
+	@Test
+	public void testRecordsWithSplitsRecycledWhenEmpty() throws Exception {
+		final TestingRecordsWithSplitIds<String> records = new TestingRecordsWithSplitIds<>("test-split", "value1", "value2");
+		final SourceReader<?, ?> reader = createReaderAndAwaitAvailable("test-split", records);
+
+		// poll thrice: twice to get all records, one more to trigger recycle and moving to the next split
+		reader.pollNext(new TestingReaderOutput<>());
+		reader.pollNext(new TestingReaderOutput<>());
+		reader.pollNext(new TestingReaderOutput<>());
+
+		assertTrue(records.isRecycled());
+	}
+
 	// ---------------- helper methods -----------------
 
 	@Override
@@ -140,4 +174,49 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 		config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L);
 		return config;
 	}
+
+	// ------------------------------------------------------------------------
+	//  Testing Setup Helpers
+	// ------------------------------------------------------------------------
+
+	private static <E> SourceReader<E, ?> createReaderAndAwaitAvailable(
+		final String splitId,
+		final RecordsWithSplitIds<E> records) throws Exception {
+
+		final FutureNotifier futureNotifier = new FutureNotifier();
+		final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue =
+			new FutureCompletingBlockingQueue<>(futureNotifier);
+
+		final SourceReader<E, TestingSourceSplit> reader = new SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, TestingSourceSplit>(
+			futureNotifier,
+			elementsQueue,
+			() -> new TestingSplitReader<E, TestingSourceSplit>(records),
+			new PassThroughRecordEmitter<E, TestingSourceSplit>(),
+			new Configuration(),
+			new TestingReaderContext()) {
+
+			@Override
+			protected void onSplitFinished(Collection<String> finishedSplitIds) {
+			}
+
+			@Override
+			protected TestingSourceSplit initializedState(TestingSourceSplit split) {
+				return split;
+			}
+
+			@Override
+			protected TestingSourceSplit toSplitType(String splitId, TestingSourceSplit splitState) {
+				return splitState;
+			}
+		};
+
+		reader.start();
+
+		final List<TestingSourceSplit> splits = Collections.singletonList(new TestingSourceSplit(splitId));
+		reader.addSplits(splits);
+
+		reader.isAvailable().get();
+
+		return reader;
+	}
 }