You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/16 16:17:33 UTC
[flink] branch master updated: [FLINK-19251][connectors] Avoid
confusing queue handling in "SplitReader.handleSplitsChanges()"
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new ee5c4c2 [FLINK-19251][connectors] Avoid confusing queue handling in "SplitReader.handleSplitsChanges()"
ee5c4c2 is described below
commit ee5c4c211c35c70d28252363bbc8400453609977
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 15 23:36:44 2020 +0200
[FLINK-19251][connectors] Avoid confusing queue handling in "SplitReader.handleSplitsChanges()"
This removes the queue (and repeated queue passing logic) and simly passes a list of split changes
directly and once, for the fetcher to handle.
This closes #13400
---
.../base/source/reader/fetcher/AddSplitsTask.java | 18 +++++-------------
.../base/source/reader/fetcher/SplitFetcher.java | 10 ++--------
.../base/source/reader/splitreader/SplitReader.java | 5 ++---
.../base/source/reader/SourceReaderBaseTest.java | 8 ++------
.../reader/fetcher/SplitFetcherManagerTest.java | 4 +---
.../base/source/reader/fetcher/SplitFetcherTest.java | 2 +-
.../base/source/reader/mocks/MockBaseSource.java | 2 +-
.../base/source/reader/mocks/MockSplitReader.java | 19 +++++++------------
.../base/source/reader/mocks/TestingSplitReader.java | 5 +----
9 files changed, 22 insertions(+), 51 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
index 82e529a..12b7d5d 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/AddSplitsTask.java
@@ -21,43 +21,35 @@ package org.apache.flink.connector.base.source.reader.fetcher;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
-import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
/**
* The task to add splits.
*/
class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask {
+
private final SplitReader<?, SplitT> splitReader;
private final List<SplitT> splitsToAdd;
- private final Queue<SplitsChange<SplitT>> splitsChanges;
private final Map<String, SplitT> assignedSplits;
- private boolean splitsChangesAdded;
AddSplitsTask(
SplitReader<?, SplitT> splitReader,
List<SplitT> splitsToAdd,
- Queue<SplitsChange<SplitT>> splitsChanges,
Map<String, SplitT> assignedSplits) {
this.splitReader = splitReader;
this.splitsToAdd = splitsToAdd;
- this.splitsChanges = splitsChanges;
this.assignedSplits = assignedSplits;
- this.splitsChangesAdded = false;
}
@Override
public boolean run() {
- if (!splitsChangesAdded) {
- splitsChanges.add(new SplitsAddition<>(splitsToAdd));
- splitsToAdd.forEach(s -> assignedSplits.put(s.splitId(), s));
- splitsChangesAdded = true;
+ for (SplitT s : splitsToAdd) {
+ assignedSplits.put(s.splitId(), s);
}
- splitReader.handleSplitsChanges(splitsChanges);
- return splitsChanges.isEmpty();
+ splitReader.handleSplitsChanges(new SplitsAddition<>(splitsToAdd));
+ return true;
}
@Override
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index db5a203..633c452 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -21,17 +21,14 @@ package org.apache.flink.connector.base.source.reader.fetcher;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
-import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -47,8 +44,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
private final BlockingDeque<SplitFetcherTask> taskQueue;
// track the assigned splits so we can suspend the reader when there is no splits assigned.
private final Map<String, SplitT> assignedSplits;
- /** The current split assignments for this fetcher. */
- private final Queue<SplitsChange<SplitT>> splitChanges;
private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final SplitReader<E, SplitT> splitReader;
private final Runnable shutdownHook;
@@ -70,7 +65,6 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
this.id = id;
this.taskQueue = new LinkedBlockingDeque<>();
- this.splitChanges = new LinkedList<>();
this.elementsQueue = elementsQueue;
this.assignedSplits = new HashMap<>();
this.splitReader = splitReader;
@@ -148,7 +142,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
* @param splitsToAdd the splits to add.
*/
public void addSplits(List<SplitT> splitsToAdd) {
- maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, splitChanges, assignedSplits));
+ maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits));
isIdle = false; // in case we were idle before
wakeUp(true);
}
@@ -268,7 +262,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
}
private void checkAndSetIdle() {
- final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty() && splitChanges.isEmpty();
+ final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty();
if (nowIdle) {
isIdle = true;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
index 9114614..7504452 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import java.io.IOException;
-import java.util.Queue;
/**
* An interface used to read from splits. The implementation could either read from a single split or from
@@ -49,9 +48,9 @@ public interface SplitReader<E, SplitT extends SourceSplit> {
/**
* Handle the split changes. This call should be non-blocking.
*
- * @param splitsChanges a queue with split changes that has not been handled by this SplitReader.
+ * @param splitsChanges the split changes that the SplitReader needs to handle.
*/
- void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges);
+ void handleSplitsChanges(SplitsChange<SplitT> splitsChanges);
/**
* Wake up the split reader in case the fetcher thread is blocking in
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 84eeb4e..ce32521 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -42,7 +42,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Queue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -73,10 +72,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
}
@Override
- public void handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> splitsChanges) {
- // We have to handle split changes first, otherwise fetch will not be called.
- splitsChanges.clear();
- }
+ public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {}
@Override
public void wakeUp() {
@@ -127,7 +123,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
new FutureCompletingBlockingQueue<>();
MockSplitReader mockSplitReader =
- new MockSplitReader(2, true, true);
+ new MockSplitReader(2, true);
return new MockSourceReader(
elementsQueue,
() -> mockSplitReader,
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
index 3ff25e0..6d7b8b1 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManagerTest.java
@@ -141,9 +141,7 @@ public class SplitFetcherManagerTest {
}
@Override
- public void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges) {
- splitsChanges.clear();
- }
+ public void handleSplitsChanges(SplitsChange<SplitT> splitsChanges) {}
@Override
public void wakeUp() {}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index e5f5faf..5027e3f 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -166,7 +166,7 @@ public class SplitFetcherTest {
new SplitFetcher<>(
0,
elementQueue,
- new MockSplitReader(2, true, true),
+ new MockSplitReader(2, true),
() -> {});
// Prepare the splits.
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
index 2681e5a..5eaa4fb 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
@@ -75,7 +75,7 @@ public class MockBaseSource implements Source<Integer, MockSourceSplit, List<Moc
config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L);
return new MockSourceReader(
elementsQueue,
- () -> new MockSplitReader(2, true, true),
+ () -> new MockSplitReader(2, true),
config,
readerContext);
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
index 4cb738a..f3d15f6 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitReader.java
@@ -27,7 +27,6 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Queue;
/**
* A mock split reader for unit tests. The mock split reader provides configurable behaviours.
@@ -42,7 +41,6 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
private final Map<String, MockSourceSplit> splits = new LinkedHashMap<>();
private final int numRecordsPerSplitPerFetch;
private final boolean blockingFetch;
- private final boolean handleSplitsInOneShot;
private final Object wakeupLock = new Object();
private volatile Thread threadInBlocking;
@@ -50,11 +48,9 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
public MockSplitReader(
int numRecordsPerSplitPerFetch,
- boolean blockingFetch,
- boolean handleSplitsInOneShot) {
+ boolean blockingFetch) {
this.numRecordsPerSplitPerFetch = numRecordsPerSplitPerFetch;
this.blockingFetch = blockingFetch;
- this.handleSplitsInOneShot = handleSplitsInOneShot;
}
@Override
@@ -63,13 +59,12 @@ public class MockSplitReader implements SplitReader<int[], MockSourceSplit> {
}
@Override
- public void handleSplitsChanges(Queue<SplitsChange<MockSourceSplit>> splitsChanges) {
- do {
- SplitsChange<MockSourceSplit> splitsChange = splitsChanges.poll();
- if (splitsChange instanceof SplitsAddition) {
- splitsChange.splits().forEach(s -> splits.put(s.splitId(), s));
- }
- } while (handleSplitsInOneShot && !splitsChanges.isEmpty());
+ public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChange) {
+ if (splitsChange instanceof SplitsAddition) {
+ splitsChange.splits().forEach(s -> splits.put(s.splitId(), s));
+ } else {
+ throw new IllegalArgumentException("Do not recognize split change: " + splitsChange);
+ }
}
@Override
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
index 5643088..2e6c760 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
@@ -26,7 +26,6 @@ import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
-import java.util.Queue;
/**
* A {@code SplitReader} that returns a pre-defined set of records (by split).
@@ -59,9 +58,7 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR
}
@Override
- public void handleSplitsChanges(Queue<SplitsChange<SplitT>> splitsChanges) {
- splitsChanges.clear();
- }
+ public void handleSplitsChanges(SplitsChange<SplitT> splitsChanges) {}
@Override
public void wakeUp() {