You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/16 16:17:33 UTC

[flink] branch master updated: [FLINK-19251][connectors] Avoid confusing queue handling in "SplitReader.handleSplitsChanges()"

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new ee5c4c2  [FLINK-19251][connectors] Avoid confusing queue handling in "SplitReader.handleSplitsChanges()"
ee5c4c2 is described below

commit ee5c4c211c35c70d28252363bbc8400453609977
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 15 23:36:44 2020 +0200

    [FLINK-19251][connectors] Avoid confusing queue handling in "SplitReader.handleSplitsChanges()"
    
    This removes the queue (and repeated queue passing logic) and simly passes a list of split changes
    directly and once, for the fetcher to handle.
    
    This closes #13400
---
 .../base/source/reader/fetcher/AddSplitsTask.java     | 18 +++++-------------
 .../base/source/reader/fetcher/SplitFetcher.java      | 10 ++--------
 .../base/source/reader/splitreader/SplitReader.java   |  5 ++---
 .../base/source/reader/SourceReaderBaseTest.java      |  8 ++------
 .../reader/fetcher/SplitFetcherManagerTest.java       |  4 +---
 .../base/source/reader/fetcher/SplitFetcherTest.java  |  2 +-
 .../base/source/reader/mocks/MockBaseSource.java      |  2 +-
 .../base/source/reader/mocks/MockSplitReader.java     | 19 +++++++------------
 .../base/source/reader/mocks/TestingSplitReader.java  |  5 +----
 9 files changed, 22 insertions(+), 51 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
index 82e529a..12b7d5d 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
@@ -21,43 +21,35 @@ package org.apache.flink.connector.base.source.reader.fetcher;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
-import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 
 /**
  * The task to add splits.
  */
 class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask {
+
 	private final SplitReader<?, SplitT> splitReader;
 	private final List<SplitT> splitsToAdd;
-	private final Queue<SplitsChange<SplitT>> splitsChanges;
 	private final Map<String, SplitT> assignedSplits;
-	private boolean splitsChangesAdded;
 
 	AddSplitsTask(
 			SplitReader<?, SplitT> splitReader,
 			List<SplitT> splitsToAdd,
-			Queue<SplitsChange<SplitT>> splitsChanges,
 			Map<String, SplitT> assignedSplits) {
 		this.splitReader = splitReader;
 		this.splitsToAdd = splitsToAdd;
-		this.splitsChanges = splitsChanges;
 		this.assignedSplits = assignedSplits;
-		this.splitsChangesAdded = false;
 	}
 
 	@Override
 	public boolean run() {
-		if (!splitsChangesAdded) {
-			splitsChanges.add(new SplitsAddition<>(splitsToAdd));
-			splitsToAdd.forEach(s -> assignedSplits.put(s.splitId(), s));
-			splitsChangesAdded = true;
+		for (SplitT s : splitsToAdd) {
+			assignedSplits.put(s.splitId(), s);
 		}
-		splitReader.handleSplitsChanges(splitsChanges);
-		return splitsChanges.isEmpty();
+		splitReader.handleSplitsChanges(new SplitsAddition<>(splitsToAdd));
+		return true;
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index db5a203..633c452 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -21,17 +21,14 @@ package org.apache.flink.connector.base.source.reader.fetcher;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,8 +44,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 	private final BlockingDeque<SplitFetcherTask> taskQueue;
 	// track the assigned splits so we can suspend the reader when there is no splits assigned.
 	private final Map<String, SplitT> assignedSplits;
-	/** The current split assignments for this fetcher. */
-	private final Queue<SplitsChange<SplitT>> splitChanges;
 	private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
 	private final SplitReader<E, SplitT> splitReader;
 	private final Runnable shutdownHook;
@@ -70,7 +65,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 
 		this.id = id;
 		this.taskQueue = new LinkedBlockingDeque<>();
-		this.splitChanges = new LinkedList<>();
 		this.elementsQueue = elementsQueue;
 		this.assignedSplits = new HashMap<>();
 		this.splitReader = splitReader;
@@ -148,7 +142,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 	 * @param splitsToAdd the splits to add.
 	 */
 	public void addSplits(List<SplitT> splitsToAdd) {
-		maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, splitChanges, assignedSplits));
+		maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits));
 		isIdle = false; // in case we were idle before
 		wakeUp(true);
 	}
@@ -268,7 +262,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 	}
 
 	private void checkAndSetIdle() {
-		final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty() && splitChanges.isEmpty();
+		final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty();
 		if (nowIdle) {
 			isIdle = true;
 
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index 9114614..7504452 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 
 import java.io.IOException;
-import java.util.Queue;
 
 /**
  * An interface used to read from splits. The implementation could either read from a single split or from
@@ -49,9 +48,9 @@ public interface SplitReader<E, SplitT extends SourceSplit> {
 	/**
 	 * Handle the split changes. This call should be non-blocking.
 	 *
-	 * @param splitsChanges a queue with split changes that has not been handled by this SplitReader.
+	 * @param splitsChanges the split changes that the SplitReader needs to handle.
 	 */
-	void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);
+	void handleSplitsChanges(SplitsChange<SplitT> splitsChanges);
 
 	/**
 	 * Wake up the split reader in case the fetcher thread is blocking in
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 84eeb4e..ce32521 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -42,7 +42,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Queue;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -73,10 +72,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 				}
 
 				@Override
-				public void handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> splitsChanges) {
-					// We have to handle split changes first, otherwise fetch will not be called.
-					splitsChanges.clear();
-				}
+				public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {}
 
 				@Override
 				public void wakeUp() {
@@ -127,7 +123,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
 			new FutureCompletingBlockingQueue<>();
 		MockSplitReader mockSplitReader =
-			new MockSplitReader(2, true, true);
+			new MockSplitReader(2, true);
 		return new MockSourceReader(
 			elementsQueue,
 			() -> mockSplitReader,
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
index 3ff25e0..6d7b8b1 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
@@ -141,9 +141,7 @@ public class SplitFetcherManagerTest {
 		}
 
 		@Override
-		public void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges) {
-			splitsChanges.clear();
-		}
+		public void handleSplitsChanges(SplitsChange<SplitT> splitsChanges) {}
 
 		@Override
 		public void wakeUp() {}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index e5f5faf..5027e3f 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -166,7 +166,7 @@ public class SplitFetcherTest {
 				new SplitFetcher<>(
 						0,
 						elementQueue,
-						new MockSplitReader(2, true, true),
+						new MockSplitReader(2, true),
 						() -> {});
 
 		// Prepare the splits.
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
index 2681e5a..5eaa4fb 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
@@ -75,7 +75,7 @@ public class MockBaseSource implements Source<Integer, MockSourceSplit, List<Moc
 		config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L);
 		return new MockSourceReader(
 				elementsQueue,
-				() -> new MockSplitReader(2, true, true),
+				() -> new MockSplitReader(2, true),
 				config,
 				readerContext);
 	}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
index 4cb738a..f3d15f6 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
@@ -27,7 +27,6 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.Queue;
 
 /**
  * A mock split reader for unit tests. The mock split reader provides configurable behaviours.
@@ -42,7 +41,6 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 	private final Map<String, MockSourceSplit> splits = new LinkedHashMap<>();
 	private final int numRecordsPerSplitPerFetch;
 	private final boolean blockingFetch;
-	private final boolean handleSplitsInOneShot;
 
 	private final Object wakeupLock = new Object();
 	private volatile Thread threadInBlocking;
@@ -50,11 +48,9 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 
 	public MockSplitReader(
 			int numRecordsPerSplitPerFetch,
-			boolean blockingFetch,
-			boolean handleSplitsInOneShot) {
+			boolean blockingFetch) {
 		this.numRecordsPerSplitPerFetch = numRecordsPerSplitPerFetch;
 		this.blockingFetch = blockingFetch;
-		this.handleSplitsInOneShot = handleSplitsInOneShot;
 	}
 
 	@Override
@@ -63,13 +59,12 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
 	}
 
 	@Override
-	public void handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> splitsChanges) {
-		do {
-			SplitsChange<MockSourceSplit> splitsChange = splitsChanges.poll();
-			if (splitsChange instanceof SplitsAddition) {
-				splitsChange.splits().forEach(s -> splits.put(s.splitId(), s));
-			}
-		} while (handleSplitsInOneShot && !splitsChanges.isEmpty());
+	public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChange) {
+		if (splitsChange instanceof SplitsAddition) {
+			splitsChange.splits().forEach(s -> splits.put(s.splitId(), s));
+		} else {
+			throw new IllegalArgumentException("Do not recognize split change: " + splitsChange);
+		}
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
index 5643088..2e6c760 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
@@ -26,7 +26,6 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Arrays;
-import java.util.Queue;
 
 /**
  * A {@code SplitReader} that returns a pre-defined set of records (by split).
@@ -59,9 +58,7 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR
 	}
 
 	@Override
-	public void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges) {
-		splitsChanges.clear();
-	}
+	public void handleSplitsChanges(SplitsChange<SplitT> splitsChanges) {}
 
 	@Override
 	public void wakeUp() {