You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/15 20:13:56 UTC

[flink] 08/11: [FLINK-18128][connectors] Ensure idle split fetchers lead to availability notifications.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 511857049ba30c8ff0ee56da551fa4a479dc583e
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 20:55:57 2020 +0200

    [FLINK-18128][connectors] Ensure idle split fetchers lead to availability notifications.
---
 .../base/source/reader/fetcher/SplitFetcher.java   |  23 ++-
 .../FutureCompletingBlockingQueue.java             |   4 +
 .../source/reader/fetcher/SplitFetcherTest.java    | 185 +++++++++++++++++++++
 3 files changed, 206 insertions(+), 6 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 289dc34..3beb0da 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -57,6 +57,10 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 	private final AtomicBoolean closed;
 	private FetchTask<E, SplitT> fetchTask;
 	private volatile SplitFetcherTask runningTask = null;
+
+	/** Flag whether this fetcher has no work assigned at the moment.
+	 * Fetcher that have work (a split) assigned but are currently blocked (for example enqueueing
+	 * a fetch and hitting the element queue limit) are NOT considered idle. */
 	private volatile boolean isIdle;
 
 	SplitFetcher(
@@ -81,7 +85,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 				elementsQueue,
 				ids -> {
 					ids.forEach(assignedSplits::remove);
-					updateIsIdle();
+					checkAndSetIdle();
 				},
 				id);
 	}
@@ -168,7 +172,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 	 */
 	public void addSplits(List<SplitT> splitsToAdd) {
 		maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, splitChanges, assignedSplits));
-		updateIsIdle();
+		isIdle = false; // in case we were idle before
 		wakeUp(true);
 	}
 
@@ -292,6 +296,17 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 
 	}
 
+	private void checkAndSetIdle() {
+		final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty() && splitChanges.isEmpty();
+		if (nowIdle) {
+			isIdle = true;
+
+			// because the method might get invoked past the point when the source reader last checked
+			// the elements queue, we need to notify availability in the case when we become idle
+			elementsQueue.notifyAvailable();
+		}
+	}
+
 	//--------------------- Helper class ------------------
 
 	private static class DummySplitFetcherTask implements SplitFetcherTask {
@@ -316,8 +331,4 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 			return name;
 		}
 	}
-
-	private void updateIsIdle() {
-		isIdle = taskQueue.isEmpty() && splitChanges.isEmpty() && assignedSplits.isEmpty();
-	}
 }
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index ea0f030..dcbb66e 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -190,6 +190,10 @@ public class FutureCompletingBlockingQueue<T> {
 		}
 	}
 
+	public void notifyAvailable() {
+		futureNotifier.notifyComplete();
+	}
+
 	// --------------- private helpers -------------------------
 
 	private void enqueue(T element) {
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 4fa99dd..6e27d95 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -19,10 +19,15 @@
 package org.apache.flink.connector.base.source.reader.fetcher;
 
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
+import org.apache.flink.core.testutils.CheckedThread;
 
 import org.junit.Test;
 
@@ -31,10 +36,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -43,6 +50,113 @@ import static org.junit.Assert.assertTrue;
 public class SplitFetcherTest {
 
 	@Test
+	public void testNewFetcherIsIdle() {
+		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcher(new TestingSplitReader<>());
+		assertTrue(fetcher.isIdle());
+	}
+
+	@Test
+	public void testFetcherNotIdleAfterSplitAdded() {
+		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcher(new TestingSplitReader<>());
+		final TestingSourceSplit split = new TestingSourceSplit("test-split");
+
+		fetcher.addSplits(Collections.singletonList(split));
+
+		assertFalse(fetcher.isIdle());
+
+		// need to loop here because the internal wakeup flag handling means we need multiple loops
+		while (fetcher.assignedSplits().isEmpty()) {
+			fetcher.runOnce();
+			assertFalse(fetcher.isIdle());
+		}
+	}
+
+	@Test
+	public void testIdleAfterFinishedSplitsEnqueued() {
+		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+			"test-split", new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+		fetcher.runOnce();
+
+		assertTrue(fetcher.assignedSplits().isEmpty());
+		assertTrue(fetcher.isIdle());
+	}
+
+	@Test
+	public void testNotifiesWhenGoingIdle() {
+		final FutureNotifier notifier = new FutureNotifier();
+		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+			"test-split",
+			new FutureCompletingBlockingQueue<>(notifier),
+			new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+		fetcher.runOnce();
+
+		assertTrue(fetcher.assignedSplits().isEmpty());
+		assertTrue(fetcher.isIdle());
+		assertTrue(notifier.future().isDone());
+	}
+
+	@Test
+	public void testNotifiesOlderFutureWhenGoingIdle() {
+		final FutureNotifier notifier = new FutureNotifier();
+		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+			"test-split",
+			new FutureCompletingBlockingQueue<>(notifier),
+			new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+		final CompletableFuture<?> future = notifier.future();
+
+		fetcher.runOnce();
+
+		assertTrue(fetcher.assignedSplits().isEmpty());
+		assertTrue(fetcher.isIdle());
+		assertTrue(future.isDone());
+	}
+
+	@Test
+	public void testNotifiesWhenGoingIdleConcurrent() throws Exception {
+		final FutureNotifier notifier = new FutureNotifier();
+		final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
+				new FutureCompletingBlockingQueue<>(notifier);
+		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+			"test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+		final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue);
+		queueDrainer.start();
+
+		try {
+			fetcher.runOnce();
+
+			assertTrue(notifier.future().isDone());
+		} finally {
+			queueDrainer.shutdown();
+		}
+	}
+
+	@Test
+	public void testNotifiesOlderFutureWhenGoingIdleConcurrent() throws Exception {
+		final FutureNotifier notifier = new FutureNotifier();
+		final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
+			new FutureCompletingBlockingQueue<>(notifier);
+		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+			"test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+		final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue);
+		queueDrainer.start();
+
+		final CompletableFuture<?> future = notifier.future();
+
+		try {
+			fetcher.runOnce();
+
+			assertTrue(future.isDone());
+		} finally {
+			queueDrainer.shutdown();
+		}
+	}
+
+	@Test
 	public void testWakeup() throws InterruptedException {
 		final int numSplits = 3;
 		final int numRecordsPerSplit = 10_000;
@@ -118,4 +232,75 @@ public class SplitFetcherTest {
 			interrupter.join();
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  testing utils
+	// ------------------------------------------------------------------------
+
+	private static <E> RecordsBySplits<E> finishedSplitFetch(String splitId) {
+		return new RecordsBySplits<>(Collections.emptyMap(), Collections.singleton(splitId));
+	}
+
+	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
+			final SplitReader<E, TestingSourceSplit> reader) {
+		return createFetcher(reader, new FutureCompletingBlockingQueue<>(new FutureNotifier()));
+	}
+
+	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
+			final SplitReader<E, TestingSourceSplit> reader,
+			final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue) {
+		return new SplitFetcher<>(0, queue, reader, () -> {});
+	}
+
+	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(
+			final String splitId,
+			final SplitReader<E, TestingSourceSplit> reader) {
+		return createFetcherWithSplit(splitId, new FutureCompletingBlockingQueue<>(new FutureNotifier()), reader);
+	}
+
+	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(
+			final String splitId,
+			final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue,
+			final SplitReader<E, TestingSourceSplit> reader) {
+
+		final SplitFetcher<E, TestingSourceSplit> fetcher = createFetcher(reader, queue);
+
+		fetcher.addSplits(Collections.singletonList(new TestingSourceSplit(splitId)));
+		while (fetcher.assignedSplits().isEmpty()) {
+			fetcher.runOnce();
+		}
+		return fetcher;
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class QueueDrainerThread extends CheckedThread {
+
+		private final FutureCompletingBlockingQueue<?> queue;
+		private volatile boolean running = true;
+
+		QueueDrainerThread(FutureCompletingBlockingQueue<?> queue) {
+			super("Queue Drainer");
+			setPriority(Thread.MAX_PRIORITY);
+			this.queue = queue;
+		}
+
+		@Override
+		public void go() throws Exception {
+			while (running) {
+				try {
+					queue.take();
+				}
+				catch (InterruptedException ignored) {
+					// fall through the loop
+				}
+			}
+		}
+
+		public void shutdown() throws Exception {
+			running = false;
+			interrupt();
+			sync();
+		}
+	}
 }