You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/15 20:13:52 UTC
[flink] 04/11: [FLINK-17393][connectors] Wakeup the SplitFetchers
more elegantly.
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit a5b0d3297748c1be47ad579a88f24df2255a8df1
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Sep 14 23:53:21 2020 +0200
[FLINK-17393][connectors] Wakeup the SplitFetchers more elegantly.
This closes #13366
---
.../base/source/reader/SourceReaderBase.java | 3 +-
.../base/source/reader/fetcher/FetchTask.java | 30 +--
.../base/source/reader/fetcher/SplitFetcher.java | 26 +-
.../source/reader/fetcher/SplitFetcherManager.java | 3 +-
.../FutureCompletingBlockingQueue.java | 268 +++++++++++++++++----
.../source/reader/fetcher/SplitFetcherTest.java | 8 +-
.../source/reader/mocks/TestingSplitReader.java | 13 +-
.../FutureCompletingBlockingQueueTest.java | 117 ++++++++-
8 files changed, 378 insertions(+), 90 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 02b7a7c..979afb2 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -43,7 +43,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import static org.apache.flink.util.Preconditions.checkState;
@@ -66,7 +65,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
private final FutureNotifier futureNotifier;
/** A queue to buffer the elements fetched by the fetcher thread. */
- private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+ private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
/** The state of the splits. */
private final Map<String, SplitContext<T, SplitStateT>> splitStates;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
index 30835ce..530add1 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
@@ -21,10 +21,10 @@ package org.apache.flink.connector.base.source.reader.fetcher;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import java.io.IOException;
import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
/**
@@ -32,22 +32,22 @@ import java.util.function.Consumer;
*/
class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
private final SplitReader<E, SplitT> splitReader;
- private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+ private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final Consumer<Collection<String>> splitFinishedCallback;
- private final Thread runningThread;
+ private final int fetcherIndex;
private volatile RecordsWithSplitIds<E> lastRecords;
private volatile boolean wakeup;
FetchTask(
- SplitReader<E, SplitT> splitReader,
- BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
- Consumer<Collection<String>> splitFinishedCallback,
- Thread runningThread) {
+ SplitReader<E, SplitT> splitReader,
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+ Consumer<Collection<String>> splitFinishedCallback,
+ int fetcherIndex) {
this.splitReader = splitReader;
this.elementsQueue = elementsQueue;
this.splitFinishedCallback = splitFinishedCallback;
this.lastRecords = null;
- this.runningThread = runningThread;
+ this.fetcherIndex = fetcherIndex;
this.wakeup = false;
}
@@ -61,10 +61,11 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
if (!isWakenUp()) {
// The order matters here. We must first put the last records into the queue.
// This ensures the handling of the fetched records is atomic to wakeup.
- elementsQueue.put(lastRecords);
- // The callback does not throw InterruptedException.
- splitFinishedCallback.accept(lastRecords.finishedSplits());
- lastRecords = null;
+ if (elementsQueue.put(fetcherIndex, lastRecords)) {
+ // The callback does not throw InterruptedException.
+ splitFinishedCallback.accept(lastRecords.finishedSplits());
+ lastRecords = null;
+ }
}
} finally {
// clean up the potential wakeup effect. It is possible that the fetcher is waken up
@@ -72,7 +73,6 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
// running thread will be interrupted. The next invocation of run() will see that and
// just skip.
if (isWakenUp()) {
- Thread.interrupted();
wakeup = false;
}
}
@@ -93,12 +93,12 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
splitReader.wakeUp();
} else {
// The task might be blocking on enqueuing the records, just interrupt.
- runningThread.interrupt();
+ elementsQueue.wakeUpPuttingThread(fetcherIndex);
}
}
private boolean isWakenUp() {
- return wakeup || runningThread.isInterrupted();
+ return wakeup;
}
@Override
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 35deeba..fa1442e 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,7 +34,6 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -50,21 +50,20 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
private final Map<String, SplitT> assignedSplits;
/** The current split assignments for this fetcher. */
private final Queue<SplitsChange<SplitT>> splitChanges;
- private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+ private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final SplitReader<E, SplitT> splitReader;
private final Runnable shutdownHook;
private final AtomicBoolean wakeUp;
private final AtomicBoolean closed;
private FetchTask<E, SplitT> fetchTask;
- private volatile Thread runningThread;
private volatile SplitFetcherTask runningTask = null;
private volatile boolean isIdle;
SplitFetcher(
- int id,
- BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
- SplitReader<E, SplitT> splitReader,
- Runnable shutdownHook) {
+ int id,
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+ SplitReader<E, SplitT> splitReader,
+ Runnable shutdownHook) {
this.id = id;
this.taskQueue = new LinkedBlockingDeque<>();
@@ -83,14 +82,13 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
LOG.info("Starting split fetcher {}", id);
try {
// Remove the split from the assignments if it is already done.
- runningThread = Thread.currentThread();
this.fetchTask = new FetchTask<>(
- splitReader,
- elementsQueue,
- ids -> {
- ids.forEach(this::removeAssignedSplit);
- updateIsIdle();
- }, runningThread);
+ splitReader,
+ elementsQueue,
+ ids -> {
+ ids.forEach(assignedSplits::remove);
+ updateIsIdle();
+ }, id);
while (!closed.get()) {
runOnce();
}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 61bada1..822a9a9 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -66,7 +65,7 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
private final AtomicReference<Throwable> uncaughtFetcherException;
/** The element queue that the split fetchers will put elements into. */
- private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+ private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
/** A map keeping track of all the split fetchers. */
protected final Map<Integer, SplitFetcher<E, SplitT>> fetchers;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index de51af1..ea0f030 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -18,83 +18,257 @@
package org.apache.flink.connector.base.source.reader.synchronization;
-import java.util.Collection;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Queue;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
/**
- * A BlockingQueue that allows a consuming thread to be notified asynchronously on element
- * availability when the queue is empty.
- *
- * <p>Implementation wise, it is a subclass of {@link LinkedBlockingQueue} that ensures all
- * the methods adding elements into the queue will complete the elements availability future.
- *
- * <p>The overriding methods must first put the elements into the queue then check and complete
- * the future if needed. This is required to ensure the thread waiting for more messages will
- * not lose a notification.
+ * A custom implementation of blocking queue with the following features.
+ * <ul>
+ * <li>
+ * It allows a consuming thread to be notified asynchronously on element availability when the
+ * queue is empty.
+ * </li>
+ * <li>
+ * Allows the putting threads to be gracefully waken up without interruption.
+ * </li>
+ * </ul>
*
* @param <T> the type of the elements in the queue.
*/
-public class FutureCompletingBlockingQueue<T> extends LinkedBlockingQueue<T> {
-
+public class FutureCompletingBlockingQueue<T> {
+ private final int capacity;
private final FutureNotifier futureNotifier;
+
+ /** The element queue. */
+ private final Queue<T> queue;
+ /** The lock for synchronization. */
+ private final Lock lock;
+ /** The per-thread conditions that are waiting on putting elements. */
+ private final Queue<Condition> notFull;
+ /** The shared conditions for getting elements. */
+ private final Condition notEmpty;
+ /** The per-thread conditions and wakeUp flags. */
+ private ConditionAndFlag[] putConditionAndFlags;
+
/**
- * The default capacity for {@link LinkedBlockingQueue}.
+ * The default capacity for the queue.
*/
- private static final Integer DEFAULT_CAPACITY = 10000;
+ private static final Integer DEFAULT_CAPACITY = 1;
public FutureCompletingBlockingQueue(FutureNotifier futureNotifier) {
this(futureNotifier, DEFAULT_CAPACITY);
}
public FutureCompletingBlockingQueue(FutureNotifier futureNotifier, int capacity) {
- super(capacity);
+ this.capacity = capacity;
this.futureNotifier = futureNotifier;
+ this.queue = new ArrayDeque<>(capacity);
+ this.lock = new ReentrantLock();
+ this.putConditionAndFlags = new ConditionAndFlag[1];
+ this.notFull = new ArrayDeque<>();
+ this.notEmpty = lock.newCondition();
+ }
+
+ /**
+ * Put an element into the queue. The thread blocks if the queue is full.
+ *
+ * @param threadIndex the index of the thread.
+ * @param element the element to put.
+ * @return true if the element has been successfully put into the queue, false otherwise.
+ * @throws InterruptedException when the thread is interrupted.
+ */
+ public boolean put(int threadIndex, T element) throws InterruptedException {
+ if (element == null) {
+ throw new NullPointerException();
+ }
+ lock.lockInterruptibly();
+ try {
+ while (queue.size() >= capacity) {
+ if (getAndResetWakeUpFlag(threadIndex)) {
+ return false;
+ }
+ waitOnPut(threadIndex);
+ }
+ enqueue(element);
+ return true;
+ } finally {
+ lock.unlock();
+ }
}
- @Override
- public void put(T t) throws InterruptedException {
- super.put(t);
+ /**
+ * Get and remove the first element from the queue. The call blocks if the queue is empty.
+ *
+ * @return the first element in the queue.
+ * @throws InterruptedException when the thread is interrupted.
+ */
+ public T take() throws InterruptedException{
+ lock.lock();
+ try {
+ while (queue.size() == 0) {
+ notEmpty.await();
+ }
+ return dequeue();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Get and remove the first element from the queue. Null is retuned if the queue is empty.
+ *
+ * @return the first element from the queue, or Null if the queue is empty.
+ */
+ public T poll() {
+ lock.lock();
+ try {
+ if (queue.size() == 0) {
+ return null;
+ }
+ return dequeue();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Get the first element from the queue without removing it.
+ *
+ * @return the first element in the queue, or Null if the queue is empty.
+ */
+ public T peek() {
+ lock.lock();
+ try {
+ return queue.peek();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int size() {
+ lock.lock();
+ try {
+ return queue.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public boolean isEmpty() {
+ lock.lock();
+ try {
+ return queue.isEmpty();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public int remainingCapacity() {
+ lock.lock();
+ try {
+ return capacity - queue.size();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ public void wakeUpPuttingThread(int threadIndex) {
+ lock.lock();
+ try {
+ maybeCreateCondition(threadIndex);
+ ConditionAndFlag caf = putConditionAndFlags[threadIndex];
+ if (caf != null) {
+ caf.setWakeUp(true);
+ caf.condition().signal();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ // --------------- private helpers -------------------------
+
+ private void enqueue(T element) {
+ int sizeBefore = queue.size();
+ queue.add(element);
futureNotifier.notifyComplete();
+ if (sizeBefore == 0) {
+ notEmpty.signal();
+ }
+ if (sizeBefore < capacity - 1 && !notFull.isEmpty()) {
+ signalNextPutter();
+ }
}
- @Override
- public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
- if (super.offer(t, timeout, unit)) {
- futureNotifier.notifyComplete();
- return true;
- } else {
- return false;
+ private T dequeue() {
+ int sizeBefore = queue.size();
+ T element = queue.poll();
+ if (sizeBefore == capacity && !notFull.isEmpty()) {
+ signalNextPutter();
}
+ if (sizeBefore > 1) {
+ notEmpty.signal();
+ }
+ return element;
}
- @Override
- public boolean offer(T t) {
- if (super.offer(t)) {
- futureNotifier.notifyComplete();
- return true;
- } else {
- return false;
+ private void waitOnPut(int fetcherIndex) throws InterruptedException {
+ maybeCreateCondition(fetcherIndex);
+ Condition cond = putConditionAndFlags[fetcherIndex].condition();
+ notFull.add(cond);
+ cond.await();
+ }
+
+ private void signalNextPutter() {
+ if (!notFull.isEmpty()) {
+ notFull.poll().signal();
}
}
- @Override
- public boolean add(T t) {
- if (super.add(t)) {
- futureNotifier.notifyComplete();
- return true;
- } else {
- return false;
+ private void maybeCreateCondition(int threadIndex) {
+ if (putConditionAndFlags.length < threadIndex + 1) {
+ putConditionAndFlags = Arrays.copyOf(putConditionAndFlags, threadIndex + 1);
+ }
+
+ if (putConditionAndFlags[threadIndex] == null) {
+ putConditionAndFlags[threadIndex] = new ConditionAndFlag(lock.newCondition());
}
}
- @Override
- public boolean addAll(Collection<? extends T> c) {
- if (super.addAll(c)) {
- futureNotifier.notifyComplete();
+ private boolean getAndResetWakeUpFlag(int threadIndex) {
+ maybeCreateCondition(threadIndex);
+ if (putConditionAndFlags[threadIndex].getWakeUp()) {
+ putConditionAndFlags[threadIndex].setWakeUp(false);
return true;
- } else {
- return false;
+ }
+ return false;
+ }
+
+ // --------------- private per thread state ------------
+
+ private static class ConditionAndFlag {
+ private final Condition cond;
+ private boolean wakeUp;
+
+ private ConditionAndFlag(Condition cond) {
+ this.cond = cond;
+ this.wakeUp = false;
+ }
+
+ private Condition condition() {
+ return cond;
+ }
+
+ private boolean getWakeUp() {
+ return wakeUp;
+ }
+
+ private void setWakeUp(boolean value) {
+ wakeUp = value;
}
}
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index e9c2ad2..eef8328 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.connector.base.source.reader.fetcher;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
import org.junit.Test;
@@ -29,8 +31,6 @@ import java.util.Collections;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -45,9 +45,11 @@ public class SplitFetcherTest {
private static final int NUM_RECORDS_PER_SPLIT = 10_000;
private static final int INTERRUPT_RECORDS_INTERVAL = 10;
private static final int NUM_TOTAL_RECORDS = NUM_RECORDS_PER_SPLIT * NUM_SPLITS;
+
@Test
public void testWakeup() throws InterruptedException {
- BlockingQueue<RecordsWithSplitIds<int[]>> elementQueue = new ArrayBlockingQueue<>(1);
+ FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue =
+ new FutureCompletingBlockingQueue<>(new FutureNotifier(), 1);
SplitFetcher<int[], MockSourceSplit> fetcher =
new SplitFetcher<>(
0,
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
index ede92eb..0d202f7 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
@@ -46,11 +46,10 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR
if (!fetches.isEmpty()) {
return fetches.removeFirst();
} else {
- // block until interrupted
+ // block until woken up
synchronized (fetches) {
- while (true) {
- fetches.wait();
- }
+ fetches.wait();
+ return null;
}
}
}
@@ -61,5 +60,9 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR
}
@Override
- public void wakeUp() {}
+ public void wakeUp() {
+ synchronized (fetches) {
+ fetches.notifyAll();
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
index ad74f2a..c1bde50 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -20,16 +20,129 @@ package org.apache.flink.connector.base.source.reader.synchronization;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
/**
* The unit test for {@link FutureCompletingBlockingQueue}.
*/
public class FutureCompletingBlockingQueueTest {
+ private static final Integer DEFAULT_CAPACITY = 1;
+ private static final Integer SPECIFIED_CAPACITY = 20000;
+
+ @Test
+ public void testBasics() throws InterruptedException {
+ FutureNotifier futureNotifier = new FutureNotifier();
+ FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5);
+ CompletableFuture<Void> future = futureNotifier.future();
+ assertTrue(queue.isEmpty());
+ assertEquals(0, queue.size());
- private static final Integer DEFAULT_CAPACITY = 10000;
- private static final Integer SPECIFIED_CAPACITY = 20000;
+ queue.put(0, 1234);
+
+ assertTrue(future.isDone());
+ assertEquals(1, queue.size());
+ assertFalse(queue.isEmpty());
+ assertEquals(4, queue.remainingCapacity());
+ assertNotNull(queue.peek());
+ assertEquals(1234, (int) queue.peek());
+ assertEquals(1234, (int) queue.poll());
+
+ assertEquals(0, queue.size());
+ assertTrue(queue.isEmpty());
+ assertEquals(5, queue.remainingCapacity());
+ }
+
+ @Test
+ public void testPoll() throws InterruptedException {
+ FutureNotifier futureNotifier = new FutureNotifier();
+ FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier);
+ queue.put(0, 1234);
+ Integer value = queue.poll();
+ assertNotNull(value);
+ assertEquals(1234, (int) value);
+ }
+
+ @Test
+ public void testWakeUpPut() throws InterruptedException {
+ FutureNotifier futureNotifier = new FutureNotifier();
+ FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 1);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ new Thread(() -> {
+ try {
+ assertTrue(queue.put(0, 1234));
+ assertFalse(queue.put(0, 1234));
+ latch.countDown();
+ } catch (InterruptedException e) {
+ fail("Interrupted unexpectedly.");
+ }
+ }).start();
+
+ queue.wakeUpPuttingThread(0);
+ latch.await();
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test
+ public void testConcurrency() throws InterruptedException {
+ FutureNotifier futureNotifier = new FutureNotifier();
+ FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5);
+ final int numValuesPerThread = 10000;
+ final int numPuttingThreads = 5;
+ List<Thread> threads = new ArrayList<>();
+
+ for (int i = 0; i < numPuttingThreads; i++) {
+ final int index = i;
+ Thread t = new Thread(() -> {
+ for (int j = 0; j < numValuesPerThread; j++) {
+ int base = index * numValuesPerThread;
+ try {
+ queue.put(index, base + j);
+ } catch (InterruptedException e) {
+ fail("putting thread interrupted.");
+ }
+ }
+ });
+ t.start();
+ threads.add(t);
+ }
+
+ BitSet bitSet = new BitSet();
+ AtomicInteger count = new AtomicInteger(0);
+ for (int i = 0; i < 5; i++) {
+ Thread t = new Thread(() -> {
+ while (count.get() < numPuttingThreads * numValuesPerThread) {
+ Integer value = queue.poll();
+ if (value == null) {
+ continue;
+ }
+ count.incrementAndGet();
+ if (bitSet.get(value)) {
+ fail("Value " + value + " has been consumed before");
+ }
+ synchronized (bitSet) {
+ bitSet.set(value);
+ }
+ }});
+ t.start();
+ threads.add(t);
+ }
+ for (Thread t : threads) {
+ t.join();
+ }
+ }
@Test
public void testFutureCompletingBlockingQueueConstructor() {