You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/15 20:13:57 UTC

[flink] 09/11: [FLINK-19223][connectors] Simplify Availability Future Model in Base Connector

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ea95782b4c6a2538153d4d16ad3f4839c7de0fb
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 23:48:01 2020 +0200

    [FLINK-19223][connectors] Simplify Availability Future Model in Base Connector
    
    This implements a model closer to the AvailabilityListener and AvailabilityHelper in the flink-runtime.
    
    This closes #13385
---
 .../SingleThreadMultiplexSourceReaderBase.java     |   5 +-
 .../base/source/reader/SourceReaderBase.java       |  21 +-
 .../reader/fetcher/SingleThreadFetcherManager.java |   4 +-
 .../source/reader/fetcher/SplitFetcherManager.java |   5 +-
 .../FutureCompletingBlockingQueue.java             | 259 +++++++++++++++++----
 .../reader/synchronization/FutureNotifier.java     |  66 ------
 .../base/source/reader/SourceReaderBaseTest.java   |  13 +-
 .../source/reader/fetcher/SplitFetcherTest.java    |  36 ++-
 .../base/source/reader/mocks/MockBaseSource.java   |   5 +-
 .../base/source/reader/mocks/MockSourceReader.java |   6 +-
 .../FutureCompletingBlockingQueueTest.java         |  32 ++-
 .../reader/synchronization/FutureNotifierTest.java | 131 -----------
 12 files changed, 261 insertions(+), 322 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index 3239f28..ab87db0 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import java.util.function.Supplier;
 
@@ -40,16 +39,14 @@ public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends
 	extends SourceReaderBase<E, T, SplitT, SplitStateT> {
 
 	public SingleThreadMultiplexSourceReaderBase(
-		FutureNotifier futureNotifier,
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
 		Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
 		RecordEmitter<E, T, SplitStateT> recordEmitter,
 		Configuration config,
 		SourceReaderContext context) {
 		super(
-			futureNotifier,
 			elementsQueue,
-			new SingleThreadFetcherManager<>(futureNotifier, elementsQueue, splitReaderSupplier),
+			new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier),
 			recordEmitter,
 			config,
 			context);
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 0305e2d..fb4e6df9 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -29,7 +29,6 @@ import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
 import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 import org.apache.flink.core.io.InputStatus;
 
 import org.slf4j.Logger;
@@ -61,9 +60,6 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 		implements SourceReader<T, SplitT> {
 	private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBase.class);
 
-	/** A future notifier to notify when this reader requires attention. */
-	private final FutureNotifier futureNotifier;
-
 	/** A queue to buffer the elements fetched by the fetcher thread. */
 	private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
 
@@ -94,13 +90,11 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	private boolean noMoreSplitsAssignment;
 
 	public SourceReaderBase(
-			FutureNotifier futureNotifier,
 			FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
 			SplitFetcherManager<E, SplitT> splitFetcherManager,
 			RecordEmitter<E, T, SplitStateT> recordEmitter,
 			Configuration config,
 			SourceReaderContext context) {
-		this.futureNotifier = futureNotifier;
 		this.elementsQueue = elementsQueue;
 		this.splitFetcherManager = splitFetcherManager;
 		this.recordEmitter = recordEmitter;
@@ -203,18 +197,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 
 	@Override
 	public CompletableFuture<Void> isAvailable() {
-		// The order matters here. We first get the future. After this point, if the queue
-		// is empty or there is no error in the split fetcher manager, we can ensure that
-		// the future will be completed by the fetcher once it put an element into the element queue,
-		// or it will be completed when an error occurs.
-		CompletableFuture<Void> future = futureNotifier.future();
-		splitFetcherManager.checkErrors();
-		if (!elementsQueue.isEmpty()) {
-			// The fetcher got the new elements after the last poll, or their is a finished split.
-			// Simply complete the future and return;
-			futureNotifier.notifyComplete();
-		}
-		return future;
+		return currentFetch != null ? FutureCompletingBlockingQueue.AVAILABLE : elementsQueue.getAvailabilityFuture();
 	}
 
 	@Override
@@ -239,7 +222,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 		if (sourceEvent instanceof NoMoreSplitsEvent) {
 			LOG.info("Reader received NoMoreSplits event.");
 			noMoreSplitsAssignment = true;
-			futureNotifier.notifyComplete();
+			elementsQueue.notifyAvailable();
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
index bd5879f..339c533 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import java.util.List;
 import java.util.function.Supplier;
@@ -34,10 +33,9 @@ public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
 		extends SplitFetcherManager<E, SplitT> {
 
 	public SingleThreadFetcherManager(
-			FutureNotifier futureNotifier,
 			FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
 			Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
-		super(futureNotifier, elementsQueue, splitReaderSupplier);
+		super(elementsQueue, splitReaderSupplier);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 26d92e3..ffac523 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -23,7 +23,6 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderBase;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 import org.apache.flink.util.ThrowableCatchingRunnable;
 
 import org.slf4j.Logger;
@@ -79,12 +78,10 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
 	/**
 	 * Create a split fetcher manager.
 	 *
-	 * @param futureNotifier a notifier to notify the complete of a future.
 	 * @param elementsQueue the queue that split readers will put elements into.
 	 * @param splitReaderFactory a supplier that could be used to create split readers.
 	 */
 	public SplitFetcherManager(
-			FutureNotifier futureNotifier,
 			FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
 			Supplier<SplitReader<E, SplitT>> splitReaderFactory) {
 		this.elementsQueue = elementsQueue;
@@ -96,7 +93,7 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
 					// Add the exception to the exception list.
 					uncaughtFetcherException.get().addSuppressed(t);
 					// Wake up the main thread to let it know the exception.
-					futureNotifier.notifyComplete();
+					elementsQueue.notifyAvailable();
 				}
 			}
 		};
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index dcbb66e..c89b682 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -18,62 +18,174 @@
 
 package org.apache.flink.connector.base.source.reader.synchronization;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.lang.reflect.Field;
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
- * A custom implementation of blocking queue with the following features.
- * <ul>
- *     <li>
- *         It allows a consuming thread to be notified asynchronously on element availability when the
- *         queue is empty.
- *     </li>
- *     <li>
- *         Allows the putting threads to be gracefully waken up without interruption.
- *     </li>
- * </ul>
+ * A custom implementation of blocking queue in combination with a {@link CompletableFuture} that is
+ * used in the hand-over of data from a producing thread to a consuming thread.
+ * This FutureCompletingBlockingQueue has the following features:
+ *
+ * <h3>Consumer Notifications</h3>
+ *
+ * <p>Rather than letting consumers block on the {@link #take()} method, or have them poll the
+ * {@link #poll()} method, this queue offers a {@link CompletableFuture}, obtained via the
+ * {@link #getAvailabilityFuture()} method) that gets completed whenever the queue is non-empty.
+ * A consumer can thus subscribe to asynchronous notifications for availability by adding a handler
+ * to the obtained {@code CompletableFuture}.
+ *
+ * <p>The future may also be completed by an explicit call to {@link #notifyAvailable()}. That way the
+ * consumer may be notified of a situation/condition without adding an element to the queue.
+ *
+ * <p>Availability is reset when a call to {@link #poll()} (or {@link #take()} finds an empty queue
+ * or results in an empty queue (takes the last element).
+ *
+ * <p>Note that this model generally assumes that <i>false positives</i> are okay, meaning that the
+ * availability future completes despite there being no data availabile in the queue. The consumer is
+ * responsible for polling data and obtaining another future to wait on. This is similar to the way
+ * that Java's Monitors and Conditions can have the <i>spurious wakeup</i> of the waiting threads
+ * and commonly need to be used in loop with the waiting condition.
+ *
+ * <h3>Producer Wakeup</h3>
+ *
+ * <p>The queue supports gracefully waking up producing threads that are blocked due to the queue
+ * capacity limits, without interrupting the thread. This is done via the {@link #wakeUpPuttingThread(int)}
+ * method.
  *
  * @param <T> the type of the elements in the queue.
  */
 public class FutureCompletingBlockingQueue<T> {
+
+	/**
+	 * A constant future that is complete, indicating availability. Using this constant in cases that
+	 * are guaranteed available helps short-circuiting some checks and avoiding volatile memory operations.
+	 */
+	public static final CompletableFuture<Void> AVAILABLE = getAvailableFuture();
+
+	/**
+	 * The default capacity for the queue.
+	 */
+	private static final int DEFAULT_CAPACITY = 1;
+
+	// ------------------------------------------------------------------------
+
+	/** The maximum capacity of the queue. */
 	private final int capacity;
-	private final FutureNotifier futureNotifier;
 
-	/** The element queue. */
-	private final Queue<T> queue;
+	/** The availability future. This doubles as a "non empty" condition. This value is never null.*/
+	private CompletableFuture<Void> currentFuture;
+
 	/** The lock for synchronization. */
 	private final Lock lock;
+
+	/** The element queue. */
+	@GuardedBy("lock")
+	private final Queue<T> queue;
+
 	/** The per-thread conditions that are waiting on putting elements. */
+	@GuardedBy("lock")
 	private final Queue<Condition> notFull;
-	/** The shared conditions for getting elements. */
-	private final Condition notEmpty;
+
 	/** The per-thread conditions and wakeUp flags. */
+	@GuardedBy("lock")
 	private ConditionAndFlag[] putConditionAndFlags;
 
-	/**
-	 * The default capacity for the queue.
-	 */
-	private static final Integer DEFAULT_CAPACITY = 1;
-
-	public FutureCompletingBlockingQueue(FutureNotifier futureNotifier) {
-		this(futureNotifier, DEFAULT_CAPACITY);
+	public FutureCompletingBlockingQueue() {
+		this(DEFAULT_CAPACITY);
 	}
 
-	public FutureCompletingBlockingQueue(FutureNotifier futureNotifier, int capacity) {
+	public FutureCompletingBlockingQueue(int capacity) {
+		checkArgument(capacity > 0, "capacity must be > 0");
 		this.capacity = capacity;
-		this.futureNotifier = futureNotifier;
 		this.queue = new ArrayDeque<>(capacity);
 		this.lock = new ReentrantLock();
 		this.putConditionAndFlags = new ConditionAndFlag[1];
 		this.notFull = new ArrayDeque<>();
-		this.notEmpty = lock.newCondition();
+
+		// initially the queue is empty and thus unavailable
+		this.currentFuture = new CompletableFuture<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Future / Notification logic
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns the availability future. If the queue is non-empty, then this future will already
+	 * be complete. Otherwise the obtained future is guaranteed to get completed the next time
+	 * the queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}.
+	 *
+	 * <p>It is important that a completed future is no guarantee that the next call to
+	 * {@link #poll()} will return a non-null element. If there are concurrent consumer, another
+	 * consumer may have taken the available element. Or there was no element in the first place,
+	 * because the future was completed through a call to {@link #notifyAvailable()}.
+	 *
+	 * <p>For that reason, it is important to call this method (to obtain a new future) every
+	 * time again after {@link #poll()} returned null and you want to wait for data.
+	 */
+	public CompletableFuture<Void> getAvailabilityFuture() {
+		return currentFuture;
+	}
+
+	/**
+	 * Makes sure the availability future is complete, if it is not complete already.
+	 * All futures returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to
+	 * be completed.
+	 *
+	 * <p>All future calls to the method will return a completed future, until the point
+	 * that the availability is reset via calls to {@link #poll()} that leave the queue empty.
+	 */
+	public void notifyAvailable() {
+		lock.lock();
+		try {
+			moveToAvailable();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Internal utility to make sure that the current future futures are complete (until reset).
+	 */
+	@GuardedBy("lock")
+	private void moveToAvailable() {
+		final CompletableFuture<Void> current = currentFuture;
+		if (current != AVAILABLE) {
+			currentFuture = AVAILABLE;
+			current.complete(null);
+		}
 	}
 
 	/**
+	 * Makes sure the availability future is incomplete, if it was complete before.
+	 */
+	@GuardedBy("lock")
+	private void moveToUnAvailable() {
+		if (currentFuture == AVAILABLE) {
+			currentFuture = new CompletableFuture<>();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Blocking Queue Logic
+	// ------------------------------------------------------------------------
+
+	/**
 	 * Put an element into the queue. The thread blocks if the queue is full.
 	 *
 	 * @param threadIndex the index of the thread.
@@ -101,25 +213,40 @@ public class FutureCompletingBlockingQueue<T> {
 	}
 
 	/**
-	 * Get and remove the first element from the queue. The call blocks if the queue is empty.
+	 * <b>Warning:</b> This is a dangerous method and should only be used for testing convenience.
+	 * A method that blocks until availability does not go together well with the concept of
+	 * asynchronous notifications and non-blocking polling.
+	 *
+	 * <p>Get and remove the first element from the queue. The call blocks if the queue is empty.
+	 * The problem with this method is that it may loop internally until an element is available and
+	 * that way eagerly reset the availability future. If a consumer thread is blocked in taking an
+	 * element, it will receive availability notifications from {@link #notifyAvailable()} and immediately
+	 * reset them by calling {@link #poll()} and finding the queue empty.
 	 *
 	 * @return the first element in the queue.
 	 * @throws InterruptedException when the thread is interrupted.
 	 */
-	public T take() throws InterruptedException{
-		lock.lock();
-		try {
-			while (queue.size() == 0) {
-				notEmpty.await();
+	@VisibleForTesting
+	public T take() throws InterruptedException {
+		T next;
+		while ((next = poll()) == null) {
+			// use the future to wait for availability to avoid busy waiting
+			try {
+				getAvailabilityFuture().get();
+			} catch (ExecutionException | CompletionException e) {
+				// this should never happen, but we propagate just in case
+				throw new FlinkRuntimeException("exception in queue future completion", e);
 			}
-			return dequeue();
-		} finally {
-			lock.unlock();
 		}
+		return next;
 	}
 
 	/**
-	 * Get and remove the first element from the queue. Null is retuned if the queue is empty.
+	 * Get and remove the first element from the queue. Null is returned if the queue is empty.
+	 * If this makes the queue empty (takes the last element) or finds the queue already empty,
+	 * then this resets the availability notifications. The next call to {@link #getAvailabilityFuture()}
+	 * will then return a non-complete future that completes only the next time that the queue
+	 * becomes non-empty or the {@link #notifyAvailable()} method is called.
 	 *
 	 * @return the first element from the queue, or Null if the queue is empty.
 	 */
@@ -127,6 +254,7 @@ public class FutureCompletingBlockingQueue<T> {
 		lock.lock();
 		try {
 			if (queue.size() == 0) {
+				moveToUnAvailable();
 				return null;
 			}
 			return dequeue();
@@ -149,6 +277,9 @@ public class FutureCompletingBlockingQueue<T> {
 		}
 	}
 
+	/**
+	 * Gets the size of the queue.
+	 */
 	public int size() {
 		lock.lock();
 		try {
@@ -158,6 +289,9 @@ public class FutureCompletingBlockingQueue<T> {
 		}
 	}
 
+	/**
+	 * Checks whether the queue is empty.
+	 */
 	public boolean isEmpty() {
 		lock.lock();
 		try {
@@ -167,6 +301,10 @@ public class FutureCompletingBlockingQueue<T> {
 		}
 	}
 
+	/**
+	 * Checks the remaining capacity in the queue. That is the difference between the maximum capacity
+	 * and the current number of elements in the queue.
+	 */
 	public int remainingCapacity() {
 		lock.lock();
 		try {
@@ -176,6 +314,16 @@ public class FutureCompletingBlockingQueue<T> {
 		}
 	}
 
+	/**
+	 * Gracefully wakes up the thread with the given {@code threadIndex} if it is blocked in
+	 * adding an element. to the queue. If the thread is blocked in {@link #put(int, Object)} it will
+	 * immediately return from the method with a return value of false.
+	 *
+	 * <p>If this method is called, the next time the thread with the given index is about to be blocked
+	 * in adding an element, it may immediately wake up and return.
+	 *
+	 * @param threadIndex The number identifying the thread.
+	 */
 	public void wakeUpPuttingThread(int threadIndex) {
 		lock.lock();
 		try {
@@ -190,36 +338,34 @@ public class FutureCompletingBlockingQueue<T> {
 		}
 	}
 
-	public void notifyAvailable() {
-		futureNotifier.notifyComplete();
-	}
-
 	// --------------- private helpers -------------------------
 
+	@GuardedBy("lock")
 	private void enqueue(T element) {
-		int sizeBefore = queue.size();
+		final int sizeBefore = queue.size();
 		queue.add(element);
-		futureNotifier.notifyComplete();
 		if (sizeBefore == 0) {
-			notEmpty.signal();
+			moveToAvailable();
 		}
 		if (sizeBefore < capacity - 1 && !notFull.isEmpty()) {
 			signalNextPutter();
 		}
 	}
 
+	@GuardedBy("lock")
 	private T dequeue() {
-		int sizeBefore = queue.size();
-		T element = queue.poll();
+		final int sizeBefore = queue.size();
+		final T element = queue.poll();
 		if (sizeBefore == capacity && !notFull.isEmpty()) {
 			signalNextPutter();
 		}
-		if (sizeBefore > 1) {
-			notEmpty.signal();
+		if (queue.isEmpty()) {
+			moveToUnAvailable();
 		}
 		return element;
 	}
 
+	@GuardedBy("lock")
 	private void waitOnPut(int fetcherIndex) throws InterruptedException {
 		maybeCreateCondition(fetcherIndex);
 		Condition cond = putConditionAndFlags[fetcherIndex].condition();
@@ -227,12 +373,14 @@ public class FutureCompletingBlockingQueue<T> {
 		cond.await();
 	}
 
+	@GuardedBy("lock")
 	private void signalNextPutter() {
 		if (!notFull.isEmpty()) {
 			notFull.poll().signal();
 		}
 	}
 
+	@GuardedBy("lock")
 	private void maybeCreateCondition(int threadIndex) {
 		if (putConditionAndFlags.length < threadIndex + 1) {
 			putConditionAndFlags = Arrays.copyOf(putConditionAndFlags, threadIndex + 1);
@@ -243,6 +391,7 @@ public class FutureCompletingBlockingQueue<T> {
 		}
 	}
 
+	@GuardedBy("lock")
 	private boolean getAndResetWakeUpFlag(int threadIndex) {
 		maybeCreateCondition(threadIndex);
 		if (putConditionAndFlags[threadIndex].getWakeUp()) {
@@ -275,4 +424,22 @@ public class FutureCompletingBlockingQueue<T> {
 			wakeUp = value;
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	private static CompletableFuture<Void> getAvailableFuture() {
+		// this is a way to obtain the AvailabilityProvider.AVAILABLE future until we decide to
+		// move the class from the runtime module to the core module
+		try {
+			final Class<?> clazz = Class.forName("org.apache.flink.runtime.io.AvailabilityProvider");
+			final Field field = clazz.getDeclaredField("AVAILABLE");
+			return (CompletableFuture<Void>) field.get(null);
+		}
+		catch (Throwable t) {
+			return CompletableFuture.completedFuture(null);
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java
deleted file mode 100644
index 9330407..0000000
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.source.reader.synchronization;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * A class facilitating the asynchronous communication among threads.
- */
-public class FutureNotifier {
-	/** A future reference. */
-	private final AtomicReference<CompletableFuture<Void>> futureRef;
-
-	public FutureNotifier() {
-		this.futureRef = new AtomicReference<>(null);
-	}
-
-	/**
-	 * Get the future out of this notifier. The future will be completed when someone invokes
-	 * {@link #notifyComplete()}. If there is already an uncompleted future, that existing
-	 * future will be returned instead of a new one.
-	 *
-	 * @return a future that will be completed when {@link #notifyComplete()} is invoked.
-	 */
-	public CompletableFuture<Void> future() {
-		CompletableFuture<Void> prevFuture = futureRef.get();
-		if (prevFuture != null) {
-			// Someone has created a future for us, don't create a new one.
-			return prevFuture;
-		} else {
-			CompletableFuture<Void> newFuture = new CompletableFuture<>();
-			boolean newFutureSet = futureRef.compareAndSet(null, newFuture);
-			// If someone created a future after our previous check, use that future.
-			// Otherwise, use the new future.
-			return newFutureSet ? newFuture : future();
-		}
-	}
-
-	/**
-	 * Complete the future if there is one. This will release the thread that is waiting for data.
-	 */
-	public void notifyComplete() {
-		CompletableFuture<Void> future = futureRef.get();
-		// If there are multiple threads trying to complete the future, only the first one succeeds.
-		if (future != null && future.complete(null)) {
-			futureRef.compareAndSet(future, null);
-		}
-	}
-}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 0ec4297..84eeb4e 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -62,12 +61,10 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 		expectedException.expectMessage("One or more fetchers have encountered exception");
 		final String errMsg = "Testing Exception";
 
-		FutureNotifier futureNotifier = new FutureNotifier();
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
-			new FutureCompletingBlockingQueue<>(futureNotifier);
+			new FutureCompletingBlockingQueue<>();
 		// We have to handle split changes first, otherwise fetch will not be called.
 		try (MockSourceReader reader = new MockSourceReader(
-			futureNotifier,
 			elementsQueue,
 			() -> new SplitReader<int[], MockSourceSplit>() {
 				@Override
@@ -127,13 +124,11 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 
 	@Override
 	protected MockSourceReader createReader() {
-		FutureNotifier futureNotifier = new FutureNotifier();
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
-			new FutureCompletingBlockingQueue<>(futureNotifier);
+			new FutureCompletingBlockingQueue<>();
 		MockSplitReader mockSplitReader =
 			new MockSplitReader(2, true, true);
 		return new MockSourceReader(
-			futureNotifier,
 			elementsQueue,
 			() -> mockSplitReader,
 			getConfig(),
@@ -183,12 +178,10 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 		final String splitId,
 		final RecordsWithSplitIds<E> records) throws Exception {
 
-		final FutureNotifier futureNotifier = new FutureNotifier();
 		final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue =
-			new FutureCompletingBlockingQueue<>(futureNotifier);
+			new FutureCompletingBlockingQueue<>();
 
 		final SourceReader<E, TestingSourceSplit> reader = new SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, TestingSourceSplit>(
-			futureNotifier,
 			elementsQueue,
 			() -> new TestingSplitReader<E, TestingSourceSplit>(records),
 			new PassThroughRecordEmitter<E, TestingSourceSplit>(),
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 6e27d95..c25490b 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
 import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 import org.apache.flink.core.testutils.CheckedThread;
 
 import org.junit.Test;
@@ -84,28 +83,28 @@ public class SplitFetcherTest {
 
 	@Test
 	public void testNotifiesWhenGoingIdle() {
-		final FutureNotifier notifier = new FutureNotifier();
+		final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = new FutureCompletingBlockingQueue<>();
 		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
 			"test-split",
-			new FutureCompletingBlockingQueue<>(notifier),
+			queue,
 			new TestingSplitReader<>(finishedSplitFetch("test-split")));
 
 		fetcher.runOnce();
 
 		assertTrue(fetcher.assignedSplits().isEmpty());
 		assertTrue(fetcher.isIdle());
-		assertTrue(notifier.future().isDone());
+		assertTrue(queue.getAvailabilityFuture().isDone());
 	}
 
 	@Test
 	public void testNotifiesOlderFutureWhenGoingIdle() {
-		final FutureNotifier notifier = new FutureNotifier();
+		final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = new FutureCompletingBlockingQueue<>();
 		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
-			"test-split",
-			new FutureCompletingBlockingQueue<>(notifier),
-			new TestingSplitReader<>(finishedSplitFetch("test-split")));
+				"test-split",
+				queue,
+				new TestingSplitReader<>(finishedSplitFetch("test-split")));
 
-		final CompletableFuture<?> future = notifier.future();
+		final CompletableFuture<?> future = queue.getAvailabilityFuture();
 
 		fetcher.runOnce();
 
@@ -116,9 +115,8 @@ public class SplitFetcherTest {
 
 	@Test
 	public void testNotifiesWhenGoingIdleConcurrent() throws Exception {
-		final FutureNotifier notifier = new FutureNotifier();
 		final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
-				new FutureCompletingBlockingQueue<>(notifier);
+				new FutureCompletingBlockingQueue<>();
 		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
 			"test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
 
@@ -128,7 +126,7 @@ public class SplitFetcherTest {
 		try {
 			fetcher.runOnce();
 
-			assertTrue(notifier.future().isDone());
+			assertTrue(queue.getAvailabilityFuture().isDone());
 		} finally {
 			queueDrainer.shutdown();
 		}
@@ -136,16 +134,15 @@ public class SplitFetcherTest {
 
 	@Test
 	public void testNotifiesOlderFutureWhenGoingIdleConcurrent() throws Exception {
-		final FutureNotifier notifier = new FutureNotifier();
 		final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
-			new FutureCompletingBlockingQueue<>(notifier);
+				new FutureCompletingBlockingQueue<>();
 		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
-			"test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
+				"test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
 
 		final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue);
 		queueDrainer.start();
 
-		final CompletableFuture<?> future = notifier.future();
+		final CompletableFuture<?> future = queue.getAvailabilityFuture();
 
 		try {
 			fetcher.runOnce();
@@ -164,7 +161,7 @@ public class SplitFetcherTest {
 		final int numTotalRecords = numRecordsPerSplit * numSplits;
 
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue =
-			new FutureCompletingBlockingQueue<>(new FutureNotifier(), 1);
+			new FutureCompletingBlockingQueue<>(1);
 		SplitFetcher<int[], MockSourceSplit> fetcher =
 				new SplitFetcher<>(
 						0,
@@ -243,7 +240,7 @@ public class SplitFetcherTest {
 
 	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
 			final SplitReader<E, TestingSourceSplit> reader) {
-		return createFetcher(reader, new FutureCompletingBlockingQueue<>(new FutureNotifier()));
+		return createFetcher(reader, new FutureCompletingBlockingQueue<>());
 	}
 
 	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
@@ -255,7 +252,7 @@ public class SplitFetcherTest {
 	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(
 			final String splitId,
 			final SplitReader<E, TestingSourceSplit> reader) {
-		return createFetcherWithSplit(splitId, new FutureCompletingBlockingQueue<>(new FutureNotifier()), reader);
+		return createFetcherWithSplit(splitId, new FutureCompletingBlockingQueue<>(), reader);
 	}
 
 	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(
@@ -292,6 +289,7 @@ public class SplitFetcherTest {
 					queue.take();
 				}
 				catch (InterruptedException ignored) {
+					Thread.currentThread().interrupt();
 					// fall through the loop
 				}
 			}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
index ae46286..2681e5a 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
@@ -30,7 +30,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -68,15 +67,13 @@ public class MockBaseSource implements Source<Integer, MockSourceSplit, List<Moc
 
 	@Override
 	public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
-		FutureNotifier futureNotifier = new FutureNotifier();
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
-				new FutureCompletingBlockingQueue<>(futureNotifier);
+				new FutureCompletingBlockingQueue<>();
 
 		Configuration config = new Configuration();
 		config.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
 		config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L);
 		return new MockSourceReader(
-				futureNotifier,
 				elementsQueue,
 				() -> new MockSplitReader(2, true, true),
 				config,
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
index 92a19ef..66022db 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
@@ -25,7 +25,6 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -37,12 +36,11 @@ import java.util.function.Supplier;
 public class MockSourceReader
 		extends SingleThreadMultiplexSourceReaderBase<int[], Integer, MockSourceSplit, AtomicInteger> {
 
-	public MockSourceReader(FutureNotifier futureNotifier,
-							FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue,
+	public MockSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue,
 							Supplier<SplitReader<int[], MockSourceSplit>> splitFetcherSupplier,
 							Configuration config,
 							SourceReaderContext context) {
-		super(futureNotifier, elementsQueue, splitFetcherSupplier, new MockRecordEmitter(), config, context);
+		super(elementsQueue, splitFetcherSupplier, new MockRecordEmitter(), config, context);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
index c1bde50..ef056d9e 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.base.source.reader.synchronization;
 
+import org.apache.flink.runtime.io.AvailabilityProvider;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -30,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -42,10 +45,9 @@ public class FutureCompletingBlockingQueueTest {
 
 	@Test
 	public void testBasics() throws InterruptedException {
-		FutureNotifier futureNotifier = new FutureNotifier();
-		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5);
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(5);
 
-		CompletableFuture<Void> future = futureNotifier.future();
+		CompletableFuture<Void> future = queue.getAvailabilityFuture();
 		assertTrue(queue.isEmpty());
 		assertEquals(0, queue.size());
 
@@ -66,8 +68,7 @@ public class FutureCompletingBlockingQueueTest {
 
 	@Test
 	public void testPoll() throws InterruptedException {
-		FutureNotifier futureNotifier = new FutureNotifier();
-		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier);
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>();
 		queue.put(0, 1234);
 		Integer value = queue.poll();
 		assertNotNull(value);
@@ -76,8 +77,7 @@ public class FutureCompletingBlockingQueueTest {
 
 	@Test
 	public void testWakeUpPut() throws InterruptedException {
-		FutureNotifier futureNotifier = new FutureNotifier();
-		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 1);
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(1);
 
 		CountDownLatch latch = new CountDownLatch(1);
 		new Thread(() -> {
@@ -97,8 +97,7 @@ public class FutureCompletingBlockingQueueTest {
 
 	@Test
 	public void testConcurrency() throws InterruptedException {
-		FutureNotifier futureNotifier = new FutureNotifier();
-		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5);
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(5);
 		final int numValuesPerThread = 10000;
 		final int numPuttingThreads = 5;
 		List<Thread> threads = new ArrayList<>();
@@ -146,12 +145,21 @@ public class FutureCompletingBlockingQueueTest {
 
 	@Test
 	public void testFutureCompletingBlockingQueueConstructor() {
-		FutureNotifier notifier = new FutureNotifier();
-		FutureCompletingBlockingQueue<Object> defaultCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(notifier);
-		FutureCompletingBlockingQueue<Object> specifiedCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(notifier, SPECIFIED_CAPACITY);
+		FutureCompletingBlockingQueue<Object> defaultCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>();
+		FutureCompletingBlockingQueue<Object> specifiedCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(SPECIFIED_CAPACITY);
 		// The capacity of the queue needs to be equal to 10000
 		assertEquals(defaultCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) DEFAULT_CAPACITY);
 		// The capacity of the queue needs to be equal to SPECIFIED_CAPACITY
 		assertEquals(specifiedCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) SPECIFIED_CAPACITY);
 	}
+
+	/**
+	 * This test is to guard that our reflection is not broken and we do not lose the
+	 * performance advantage. This is possible, because the tests depend on the runtime modules
+	 * while the main scope does not.
+	 */
+	@Test
+	public void testQueueUsesShortCircuitFuture() {
+		assertSame(AvailabilityProvider.AVAILABLE, FutureCompletingBlockingQueue.AVAILABLE);
+	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java
deleted file mode 100644
index b257ebf..0000000
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.source.reader.synchronization;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * The unit tests for {@link FutureNotifier}.
- */
-public class FutureNotifierTest {
-
-	@Test
-	public void testGetFuture() {
-		FutureNotifier notifier = new FutureNotifier();
-		CompletableFuture<Void> future = notifier.future();
-		// The future should not be null.
-		assertNotNull(future);
-		// Calling the future again should return the same future.
-		assertEquals(future, notifier.future());
-	}
-
-	@Test
-	public void testCompleteFuture() {
-		FutureNotifier notifier = new FutureNotifier();
-		CompletableFuture<Void> future = notifier.future();
-		assertFalse(future.isDone());
-		notifier.notifyComplete();
-		assertTrue(future.isDone());
-	}
-
-	@Test
-	public void testConcurrency() throws InterruptedException, ExecutionException {
-		final int times = 1_000_000;
-		final int nThreads = 5;
-		FutureNotifier notifier = new FutureNotifier();
-		// A thread pool that simply gets futures out of the notifier.
-		ExecutorService listenerExecutor = Executors.newFixedThreadPool(nThreads);
-		// A thread pool that completes the futures.
-		ExecutorService notifierExecutor = Executors.newFixedThreadPool(nThreads);
-
-		CountDownLatch runningListeners = new CountDownLatch(nThreads);
-		CountDownLatch startCommand = new CountDownLatch(1);
-		CountDownLatch finishLine = new CountDownLatch(1);
-
-		List<Future<?>> executionFutures = new ArrayList<>();
-		// Start nThreads thread getting futures out of the notifier.
-		for (int i = 0; i < nThreads; i++) {
-			executionFutures.add(listenerExecutor.submit(() -> {
-				try {
-					List<CompletableFuture<Void>> futures = new ArrayList<>(times);
-					startCommand.await();
-					for (int j = 0; j < times; j++) {
-						futures.add(notifier.future());
-					}
-					runningListeners.countDown();
-					// Wait for the notifying thread to finish.
-					finishLine.await();
-					// All the futures should have been completed.
-					futures.forEach(f -> {
-						assertNotNull(f);
-						assertTrue(f.isDone());
-					});
-				} catch (Exception e) {
-					fail();
-				}
-			}));
-		}
-
-		// Start nThreads thread notifying the completion.
-		for (int i = 0; i < nThreads; i++) {
-			notifierExecutor.submit(() -> {
-				try {
-					startCommand.await();
-					while (runningListeners.getCount() > 0) {
-						notifier.notifyComplete();
-					}
-					notifier.notifyComplete();
-					finishLine.countDown();
-				} catch (Exception e) {
-					fail();
-				}
-			});
-		}
-
-		// Kick off the threads.
-		startCommand.countDown();
-
-		try {
-			for (Future<?> executionFuture : executionFutures) {
-				executionFuture.get();
-			}
-		} finally {
-			listenerExecutor.shutdown();
-			notifierExecutor.shutdown();
-			listenerExecutor.awaitTermination(30L, TimeUnit.SECONDS);
-			notifierExecutor.awaitTermination(30L, TimeUnit.SECONDS);
-		}
-	}
-}