You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/15 20:13:56 UTC
[flink] 08/11: [FLINK-18128][connectors] Ensure idle split fetchers
lead to availability notifications.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 511857049ba30c8ff0ee56da551fa4a479dc583e
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 20:55:57 2020 +0200
[FLINK-18128][connectors] Ensure idle split fetchers lead to availability notifications.
---
.../base/source/reader/fetcher/SplitFetcher.java | 23 ++-
.../FutureCompletingBlockingQueue.java | 4 +
.../source/reader/fetcher/SplitFetcherTest.java | 185 +++++++++++++++++++++
3 files changed, 206 insertions(+), 6 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 289dc34..3beb0da 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -57,6 +57,10 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
private final AtomicBoolean closed;
private FetchTask<E, SplitT> fetchTask;
private volatile SplitFetcherTask runningTask = null;
+
+ /** Flag whether this fetcher has no work assigned at the moment.
+ * Fetcher that have work (a split) assigned but are currently blocked (for example enqueueing
+ * a fetch and hitting the element queue limit) are NOT considered idle. */
private volatile boolean isIdle;
SplitFetcher(
@@ -81,7 +85,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
elementsQueue,
ids -> {
ids.forEach(assignedSplits::remove);
- updateIsIdle();
+ checkAndSetIdle();
},
id);
}
@@ -168,7 +172,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
*/
public void addSplits(List<SplitT> splitsToAdd) {
maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, splitChanges, assignedSplits));
- updateIsIdle();
+ isIdle = false; // in case we were idle before
wakeUp(true);
}
@@ -292,6 +296,17 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
}
+ private void checkAndSetIdle() {
+ final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty() && splitChanges.isEmpty();
+ if (nowIdle) {
+ isIdle = true;
+
+ // because the method might get invoked past the point when the source reader last checked
+ // the elements queue, we need to notify availability in the case when we become idle
+ elementsQueue.notifyAvailable();
+ }
+ }
+
//--------------------- Helper class ------------------
private static class DummySplitFetcherTask implements SplitFetcherTask {
@@ -316,8 +331,4 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
return name;
}
}
-
- private void updateIsIdle() {
- isIdle = taskQueue.isEmpty() && splitChanges.isEmpty() && assignedSplits.isEmpty();
- }
}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index ea0f030..dcbb66e 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -190,6 +190,10 @@ public class FutureCompletingBlockingQueue<T> {
}
}
+ public void notifyAvailable() {
+ futureNotifier.notifyComplete();
+ }
+
// --------------- private helpers -------------------------
private void enqueue(T element) {
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 4fa99dd..6e27d95 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -19,10 +19,15 @@
package org.apache.flink.connector.base.source.reader.fetcher;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
+import org.apache.flink.core.testutils.CheckedThread;
import org.junit.Test;
@@ -31,10 +36,12 @@ import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
@@ -43,6 +50,113 @@ import static org.junit.Assert.assertTrue;
public class SplitFetcherTest {
@Test
+ public void testNewFetcherIsIdle() {
+ final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcher(new TestingSplitReader<>());
+ assertTrue(fetcher.isIdle());
+ }
+
+ @Test
+ public void testFetcherNotIdleAfterSplitAdded() {
+ final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcher(new TestingSplitReader<>());
+ final TestingSourceSplit split = new TestingSourceSplit("test-split");
+
+ fetcher.addSplits(Collections.singletonList(split));
+
+ assertFalse(fetcher.isIdle());
+
+ // need to loop here because the internal wakeup flag handling means we need multiple loops
+ while (fetcher.assignedSplits().isEmpty()) {
+ fetcher.runOnce();
+ assertFalse(fetcher.isIdle());
+ }
+ }
+
+ @Test
+ public void testIdleAfterFinishedSplitsEnqueued() {
+ final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+ "test-split", new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+ fetcher.runOnce();
+
+ assertTrue(fetcher.assignedSplits().isEmpty());
+ assertTrue(fetcher.isIdle());
+ }
+
+ @Test
+ public void testNotifiesWhenGoingIdle() {
+ final FutureNotifier notifier = new FutureNotifier();
+ final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+ "test-split",
+ new FutureCompletingBlockingQueue<>(notifier),
+ new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+ fetcher.runOnce();
+
+ assertTrue(fetcher.assignedSplits().isEmpty());
+ assertTrue(fetcher.isIdle());
+ assertTrue(notifier.future().isDone());
+ }
+
+ @Test
+ public void testNotifiesOlderFutureWhenGoingIdle() {
+ final FutureNotifier notifier = new FutureNotifier();
+ final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+ "test-split",
+ new FutureCompletingBlockingQueue<>(notifier),
+ new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+ final CompletableFuture<?> future = notifier.future();
+
+ fetcher.runOnce();
+
+ assertTrue(fetcher.assignedSplits().isEmpty());
+ assertTrue(fetcher.isIdle());
+ assertTrue(future.isDone());
+ }
+
+ @Test
+ public void testNotifiesWhenGoingIdleConcurrent() throws Exception {
+ final FutureNotifier notifier = new FutureNotifier();
+ final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
+ new FutureCompletingBlockingQueue<>(notifier);
+ final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+ "test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+ final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue);
+ queueDrainer.start();
+
+ try {
+ fetcher.runOnce();
+
+ assertTrue(notifier.future().isDone());
+ } finally {
+ queueDrainer.shutdown();
+ }
+ }
+
+ @Test
+ public void testNotifiesOlderFutureWhenGoingIdleConcurrent() throws Exception {
+ final FutureNotifier notifier = new FutureNotifier();
+ final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
+ new FutureCompletingBlockingQueue<>(notifier);
+ final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+ "test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+ final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue);
+ queueDrainer.start();
+
+ final CompletableFuture<?> future = notifier.future();
+
+ try {
+ fetcher.runOnce();
+
+ assertTrue(future.isDone());
+ } finally {
+ queueDrainer.shutdown();
+ }
+ }
+
+ @Test
public void testWakeup() throws InterruptedException {
final int numSplits = 3;
final int numRecordsPerSplit = 10_000;
@@ -118,4 +232,75 @@ public class SplitFetcherTest {
interrupter.join();
}
}
+
+ // ------------------------------------------------------------------------
+ // testing utils
+ // ------------------------------------------------------------------------
+
+ private static <E> RecordsBySplits<E> finishedSplitFetch(String splitId) {
+ return new RecordsBySplits<>(Collections.emptyMap(), Collections.singleton(splitId));
+ }
+
+ private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
+ final SplitReader<E, TestingSourceSplit> reader) {
+ return createFetcher(reader, new FutureCompletingBlockingQueue<>(new FutureNotifier()));
+ }
+
+ private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
+ final SplitReader<E, TestingSourceSplit> reader,
+ final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue) {
+ return new SplitFetcher<>(0, queue, reader, () -> {});
+ }
+
+ private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(
+ final String splitId,
+ final SplitReader<E, TestingSourceSplit> reader) {
+ return createFetcherWithSplit(splitId, new FutureCompletingBlockingQueue<>(new FutureNotifier()), reader);
+ }
+
+ private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(
+ final String splitId,
+ final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue,
+ final SplitReader<E, TestingSourceSplit> reader) {
+
+ final SplitFetcher<E, TestingSourceSplit> fetcher = createFetcher(reader, queue);
+
+ fetcher.addSplits(Collections.singletonList(new TestingSourceSplit(splitId)));
+ while (fetcher.assignedSplits().isEmpty()) {
+ fetcher.runOnce();
+ }
+ return fetcher;
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static final class QueueDrainerThread extends CheckedThread {
+
+ private final FutureCompletingBlockingQueue<?> queue;
+ private volatile boolean running = true;
+
+ QueueDrainerThread(FutureCompletingBlockingQueue<?> queue) {
+ super("Queue Drainer");
+ setPriority(Thread.MAX_PRIORITY);
+ this.queue = queue;
+ }
+
+ @Override
+ public void go() throws Exception {
+ while (running) {
+ try {
+ queue.take();
+ }
+ catch (InterruptedException ignored) {
+ // fall through the loop
+ }
+ }
+ }
+
+ public void shutdown() throws Exception {
+ running = false;
+ interrupt();
+ sync();
+ }
+ }
}