You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/15 20:13:52 UTC

[flink] 04/11: [FLINK-17393][connectors] Wakeup the SplitFetchers more elegantly.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a5b0d3297748c1be47ad579a88f24df2255a8df1
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Sep 14 23:53:21 2020 +0200

    [FLINK-17393][connectors] Wakeup the SplitFetchers more elegantly.
    
    This closes #13366
---
 .../base/source/reader/SourceReaderBase.java       |   3 +-
 .../base/source/reader/fetcher/FetchTask.java      |  30 +--
 .../base/source/reader/fetcher/SplitFetcher.java   |  26 +-
 .../source/reader/fetcher/SplitFetcherManager.java |   3 +-
 .../FutureCompletingBlockingQueue.java             | 268 +++++++++++++++++----
 .../source/reader/fetcher/SplitFetcherTest.java    |   8 +-
 .../source/reader/mocks/TestingSplitReader.java    |  13 +-
 .../FutureCompletingBlockingQueueTest.java         | 117 ++++++++-
 8 files changed, 378 insertions(+), 90 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 02b7a7c..979afb2 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -43,7 +43,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -66,7 +65,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	private final FutureNotifier futureNotifier;
 
 	/** A queue to buffer the elements fetched by the fetcher thread. */
-	private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+	private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
 
 	/** The state of the splits. */
 	private final Map<String, SplitContext<T, SplitStateT>> splitStates;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
index 30835ce..530add1 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
@@ -21,10 +21,10 @@ package org.apache.flink.connector.base.source.reader.fetcher;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
 import java.util.function.Consumer;
 
 /**
@@ -32,22 +32,22 @@ import java.util.function.Consumer;
  */
 class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 	private final SplitReader<E, SplitT> splitReader;
-	private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+	private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
 	private final Consumer<Collection<String>> splitFinishedCallback;
-	private final Thread runningThread;
+	private final int fetcherIndex;
 	private volatile RecordsWithSplitIds<E> lastRecords;
 	private volatile boolean wakeup;
 
 	FetchTask(
-		SplitReader<E, SplitT> splitReader,
-		BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
-		Consumer<Collection<String>> splitFinishedCallback,
-		Thread runningThread) {
+			SplitReader<E, SplitT> splitReader,
+			FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+			Consumer<Collection<String>> splitFinishedCallback,
+			int fetcherIndex) {
 		this.splitReader = splitReader;
 		this.elementsQueue = elementsQueue;
 		this.splitFinishedCallback = splitFinishedCallback;
 		this.lastRecords = null;
-		this.runningThread = runningThread;
+		this.fetcherIndex = fetcherIndex;
 		this.wakeup = false;
 	}
 
@@ -61,10 +61,11 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 			if (!isWakenUp()) {
 				// The order matters here. We must first put the last records into the queue.
 				// This ensures the handling of the fetched records is atomic to wakeup.
-				elementsQueue.put(lastRecords);
-				// The callback does not throw InterruptedException.
-				splitFinishedCallback.accept(lastRecords.finishedSplits());
-				lastRecords = null;
+				if (elementsQueue.put(fetcherIndex, lastRecords)) {
+					// The callback does not throw InterruptedException.
+					splitFinishedCallback.accept(lastRecords.finishedSplits());
+					lastRecords = null;
+				}
 			}
 		} finally {
 			// clean up the potential wakeup effect. It is possible that the fetcher is waken up
@@ -72,7 +73,6 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 			// running thread will be interrupted. The next invocation of run() will see that and
 			// just skip.
 			if (isWakenUp()) {
-				Thread.interrupted();
 				wakeup = false;
 			}
 		}
@@ -93,12 +93,12 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 			splitReader.wakeUp();
 		} else {
 			// The task might be blocking on enqueuing the records, just interrupt.
-			runningThread.interrupt();
+			elementsQueue.wakeUpPuttingThread(fetcherIndex);
 		}
 	}
 
 	private boolean isWakenUp() {
-		return wakeup || runningThread.isInterrupted();
+		return wakeup;
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 35deeba..fa1442e 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,7 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -50,21 +50,20 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 	private final Map<String, SplitT> assignedSplits;
 	/** The current split assignments for this fetcher. */
 	private final Queue<SplitsChange<SplitT>> splitChanges;
-	private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+	private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
 	private final SplitReader<E, SplitT> splitReader;
 	private final Runnable shutdownHook;
 	private final AtomicBoolean wakeUp;
 	private final AtomicBoolean closed;
 	private FetchTask<E, SplitT> fetchTask;
-	private volatile Thread runningThread;
 	private volatile SplitFetcherTask runningTask = null;
 	private volatile boolean isIdle;
 
 	SplitFetcher(
-		int id,
-		BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
-		SplitReader<E, SplitT> splitReader,
-		Runnable shutdownHook) {
+			int id,
+			FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+			SplitReader<E, SplitT> splitReader,
+			Runnable shutdownHook) {
 
 		this.id = id;
 		this.taskQueue = new LinkedBlockingDeque<>();
@@ -83,14 +82,13 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 		LOG.info("Starting split fetcher {}", id);
 		try {
 			// Remove the split from the assignments if it is already done.
-			runningThread = Thread.currentThread();
 			this.fetchTask = new FetchTask<>(
-				splitReader,
-				elementsQueue,
-				ids -> {
-					ids.forEach(this::removeAssignedSplit);
-					updateIsIdle();
-				}, runningThread);
+					splitReader,
+					elementsQueue,
+					ids -> {
+						ids.forEach(assignedSplits::remove);
+						updateIsIdle();
+					}, id);
 			while (!closed.get()) {
 				runOnce();
 			}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 61bada1..822a9a9 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -66,7 +65,7 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
 	private final AtomicReference<Throwable> uncaughtFetcherException;
 
 	/** The element queue that the split fetchers will put elements into. */
-	private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+	private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
 
 	/** A map keeping track of all the split fetchers. */
 	protected final Map<Integer, SplitFetcher<E, SplitT>> fetchers;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index de51af1..ea0f030 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -18,83 +18,257 @@
 
 package org.apache.flink.connector.base.source.reader.synchronization;
 
-import java.util.Collection;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Queue;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * A BlockingQueue that allows a consuming thread to be notified asynchronously on element
- * availability when the queue is empty.
- *
- * <p>Implementation wise, it is a subclass of {@link LinkedBlockingQueue} that ensures all
- * the methods adding elements into the queue will complete the elements availability future.
- *
- * <p>The overriding methods must first put the elements into the queue then check and complete
- * the future if needed. This is required to ensure the thread waiting for more messages will
- * not lose a notification.
+ * A custom implementation of blocking queue with the following features.
+ * <ul>
+ *     <li>
+ *         It allows a consuming thread to be notified asynchronously on element availability when the
+ *         queue is empty.
+ *     </li>
+ *     <li>
+ *         Allows the putting threads to be gracefully waken up without interruption.
+ *     </li>
+ * </ul>
  *
  * @param <T> the type of the elements in the queue.
  */
-public class FutureCompletingBlockingQueue<T> extends LinkedBlockingQueue<T> {
-
+public class FutureCompletingBlockingQueue<T> {
+	private final int capacity;
 	private final FutureNotifier futureNotifier;
+
+	/** The element queue. */
+	private final Queue<T> queue;
+	/** The lock for synchronization. */
+	private final Lock lock;
+	/** The per-thread conditions that are waiting on putting elements. */
+	private final Queue<Condition> notFull;
+	/** The shared conditions for getting elements. */
+	private final Condition notEmpty;
+	/** The per-thread conditions and wakeUp flags. */
+	private ConditionAndFlag[] putConditionAndFlags;
+
 	/**
-	 * The default capacity for {@link LinkedBlockingQueue}.
+	 * The default capacity for the queue.
 	 */
-	private static final Integer DEFAULT_CAPACITY = 10000;
+	private static final Integer DEFAULT_CAPACITY = 1;
 
 	public FutureCompletingBlockingQueue(FutureNotifier futureNotifier) {
 		this(futureNotifier, DEFAULT_CAPACITY);
 	}
 
 	public FutureCompletingBlockingQueue(FutureNotifier futureNotifier, int capacity) {
-		super(capacity);
+		this.capacity = capacity;
 		this.futureNotifier = futureNotifier;
+		this.queue = new ArrayDeque<>(capacity);
+		this.lock = new ReentrantLock();
+		this.putConditionAndFlags = new ConditionAndFlag[1];
+		this.notFull = new ArrayDeque<>();
+		this.notEmpty = lock.newCondition();
+	}
+
+	/**
+	 * Put an element into the queue. The thread blocks if the queue is full.
+	 *
+	 * @param threadIndex the index of the thread.
+	 * @param element the element to put.
+	 * @return true if the element has been successfully put into the queue, false otherwise.
+	 * @throws InterruptedException when the thread is interrupted.
+	 */
+	public boolean put(int threadIndex, T element) throws InterruptedException {
+		if (element == null) {
+			throw new NullPointerException();
+		}
+		lock.lockInterruptibly();
+		try {
+			while (queue.size() >= capacity) {
+				if (getAndResetWakeUpFlag(threadIndex)) {
+					return false;
+				}
+				waitOnPut(threadIndex);
+			}
+			enqueue(element);
+			return true;
+		} finally {
+			lock.unlock();
+		}
 	}
 
-	@Override
-	public void put(T t) throws InterruptedException {
-		super.put(t);
+	/**
+	 * Get and remove the first element from the queue. The call blocks if the queue is empty.
+	 *
+	 * @return the first element in the queue.
+	 * @throws InterruptedException when the thread is interrupted.
+	 */
+	public T take() throws InterruptedException{
+		lock.lock();
+		try {
+			while (queue.size() == 0) {
+				notEmpty.await();
+			}
+			return dequeue();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Get and remove the first element from the queue. Null is retuned if the queue is empty.
+	 *
+	 * @return the first element from the queue, or Null if the queue is empty.
+	 */
+	public T poll() {
+		lock.lock();
+		try {
+			if (queue.size() == 0) {
+				return null;
+			}
+			return dequeue();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Get the first element from the queue without removing it.
+	 *
+	 * @return the first element in the queue, or Null if the queue is empty.
+	 */
+	public T peek() {
+		lock.lock();
+		try {
+			return queue.peek();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	public int size() {
+		lock.lock();
+		try {
+			return queue.size();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	public boolean isEmpty() {
+		lock.lock();
+		try {
+			return queue.isEmpty();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	public int remainingCapacity() {
+		lock.lock();
+		try {
+			return capacity - queue.size();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	public void wakeUpPuttingThread(int threadIndex) {
+		lock.lock();
+		try {
+			maybeCreateCondition(threadIndex);
+			ConditionAndFlag caf = putConditionAndFlags[threadIndex];
+			if (caf != null) {
+				caf.setWakeUp(true);
+				caf.condition().signal();
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	// --------------- private helpers -------------------------
+
+	private void enqueue(T element) {
+		int sizeBefore = queue.size();
+		queue.add(element);
 		futureNotifier.notifyComplete();
+		if (sizeBefore == 0) {
+			notEmpty.signal();
+		}
+		if (sizeBefore < capacity - 1 && !notFull.isEmpty()) {
+			signalNextPutter();
+		}
 	}
 
-	@Override
-	public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
-		if (super.offer(t, timeout, unit)) {
-			futureNotifier.notifyComplete();
-			return true;
-		} else {
-			return false;
+	private T dequeue() {
+		int sizeBefore = queue.size();
+		T element = queue.poll();
+		if (sizeBefore == capacity && !notFull.isEmpty()) {
+			signalNextPutter();
 		}
+		if (sizeBefore > 1) {
+			notEmpty.signal();
+		}
+		return element;
 	}
 
-	@Override
-	public boolean offer(T t) {
-		if (super.offer(t)) {
-			futureNotifier.notifyComplete();
-			return true;
-		} else {
-			return false;
+	private void waitOnPut(int fetcherIndex) throws InterruptedException {
+		maybeCreateCondition(fetcherIndex);
+		Condition cond = putConditionAndFlags[fetcherIndex].condition();
+		notFull.add(cond);
+		cond.await();
+	}
+
+	private void signalNextPutter() {
+		if (!notFull.isEmpty()) {
+			notFull.poll().signal();
 		}
 	}
 
-	@Override
-	public boolean add(T t) {
-		if (super.add(t)) {
-			futureNotifier.notifyComplete();
-			return true;
-		} else {
-			return false;
+	private void maybeCreateCondition(int threadIndex) {
+		if (putConditionAndFlags.length < threadIndex + 1) {
+			putConditionAndFlags = Arrays.copyOf(putConditionAndFlags, threadIndex + 1);
+		}
+
+		if (putConditionAndFlags[threadIndex] == null) {
+			putConditionAndFlags[threadIndex] = new ConditionAndFlag(lock.newCondition());
 		}
 	}
 
-	@Override
-	public boolean addAll(Collection<? extends T> c) {
-		if (super.addAll(c)) {
-			futureNotifier.notifyComplete();
+	private boolean getAndResetWakeUpFlag(int threadIndex) {
+		maybeCreateCondition(threadIndex);
+		if (putConditionAndFlags[threadIndex].getWakeUp()) {
+			putConditionAndFlags[threadIndex].setWakeUp(false);
 			return true;
-		} else {
-			return false;
+		}
+		return false;
+	}
+
+	// --------------- private per thread state ------------
+
+	private static class ConditionAndFlag {
+		private final Condition cond;
+		private boolean wakeUp;
+
+		private ConditionAndFlag(Condition cond) {
+			this.cond = cond;
+			this.wakeUp = false;
+		}
+
+		private Condition condition() {
+			return cond;
+		}
+
+		private boolean getWakeUp() {
+			return wakeUp;
+		}
+
+		private void setWakeUp(boolean value) {
+			wakeUp = value;
 		}
 	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index e9c2ad2..eef8328 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.connector.base.source.reader.fetcher;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import org.junit.Test;
 
@@ -29,8 +31,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -45,9 +45,11 @@ public class SplitFetcherTest {
 	private static final int NUM_RECORDS_PER_SPLIT = 10_000;
 	private static final int INTERRUPT_RECORDS_INTERVAL = 10;
 	private static final int NUM_TOTAL_RECORDS = NUM_RECORDS_PER_SPLIT * NUM_SPLITS;
+
 	@Test
 	public void testWakeup() throws InterruptedException {
-		BlockingQueue<RecordsWithSplitIds<int[]>> elementQueue = new ArrayBlockingQueue<>(1);
+		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue =
+			new FutureCompletingBlockingQueue<>(new FutureNotifier(), 1);
 		SplitFetcher<int[], MockSourceSplit> fetcher =
 				new SplitFetcher<>(
 						0,
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
index ede92eb..0d202f7 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
@@ -46,11 +46,10 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR
 		if (!fetches.isEmpty()) {
 			return fetches.removeFirst();
 		} else {
-			// block until interrupted
+			// block until woken up
 			synchronized (fetches) {
-				while (true) {
-					fetches.wait();
-				}
+				fetches.wait();
+				return null;
 			}
 		}
 	}
@@ -61,5 +60,9 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR
 	}
 
 	@Override
-	public void wakeUp() {}
+	public void wakeUp() {
+		synchronized (fetches) {
+			fetches.notifyAll();
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
index ad74f2a..c1bde50 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -20,16 +20,129 @@ package org.apache.flink.connector.base.source.reader.synchronization;
 
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * The unit test for {@link FutureCompletingBlockingQueue}.
  */
 public class FutureCompletingBlockingQueueTest {
+	private static final Integer DEFAULT_CAPACITY = 1;
+	private static final Integer SPECIFIED_CAPACITY = 20000;
+
+	@Test
+	public void testBasics() throws InterruptedException {
+		FutureNotifier futureNotifier = new FutureNotifier();
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5);
 
+		CompletableFuture<Void> future = futureNotifier.future();
+		assertTrue(queue.isEmpty());
+		assertEquals(0, queue.size());
 
-	private static final Integer DEFAULT_CAPACITY = 10000;
-	private static final Integer SPECIFIED_CAPACITY = 20000;
+		queue.put(0, 1234);
+
+		assertTrue(future.isDone());
+		assertEquals(1, queue.size());
+		assertFalse(queue.isEmpty());
+		assertEquals(4, queue.remainingCapacity());
+		assertNotNull(queue.peek());
+		assertEquals(1234, (int) queue.peek());
+		assertEquals(1234, (int) queue.poll());
+
+		assertEquals(0, queue.size());
+		assertTrue(queue.isEmpty());
+		assertEquals(5, queue.remainingCapacity());
+	}
+
+	@Test
+	public void testPoll() throws InterruptedException {
+		FutureNotifier futureNotifier = new FutureNotifier();
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier);
+		queue.put(0, 1234);
+		Integer value = queue.poll();
+		assertNotNull(value);
+		assertEquals(1234, (int) value);
+	}
+
+	@Test
+	public void testWakeUpPut() throws InterruptedException {
+		FutureNotifier futureNotifier = new FutureNotifier();
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 1);
+
+		CountDownLatch latch = new CountDownLatch(1);
+		new Thread(() -> {
+			try {
+				assertTrue(queue.put(0, 1234));
+				assertFalse(queue.put(0, 1234));
+				latch.countDown();
+			} catch (InterruptedException e) {
+				fail("Interrupted unexpectedly.");
+			}
+		}).start();
+
+		queue.wakeUpPuttingThread(0);
+		latch.await();
+		assertEquals(0, latch.getCount());
+	}
+
+	@Test
+	public void testConcurrency() throws InterruptedException {
+		FutureNotifier futureNotifier = new FutureNotifier();
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5);
+		final int numValuesPerThread = 10000;
+		final int numPuttingThreads = 5;
+		List<Thread> threads = new ArrayList<>();
+
+		for (int i = 0; i < numPuttingThreads; i++) {
+			final int index = i;
+			Thread t = new Thread(() -> {
+				for (int j = 0; j < numValuesPerThread; j++) {
+					int base = index * numValuesPerThread;
+					try {
+						queue.put(index, base + j);
+					} catch (InterruptedException e) {
+						fail("putting thread interrupted.");
+					}
+				}
+			});
+			t.start();
+			threads.add(t);
+		}
+
+		BitSet bitSet = new BitSet();
+		AtomicInteger count = new AtomicInteger(0);
+		for (int i = 0; i < 5; i++) {
+			Thread t = new Thread(() -> {
+				while (count.get() < numPuttingThreads * numValuesPerThread) {
+					Integer value = queue.poll();
+					if (value == null) {
+						continue;
+					}
+					count.incrementAndGet();
+					if (bitSet.get(value)) {
+						fail("Value " + value + " has been consumed before");
+					}
+					synchronized (bitSet) {
+						bitSet.set(value);
+					}
+				}});
+			t.start();
+			threads.add(t);
+		}
+		for (Thread t : threads) {
+			t.join();
+		}
+	}
 
 	@Test
 	public void testFutureCompletingBlockingQueueConstructor() {