You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/15 20:13:57 UTC
[flink] 09/11: [FLINK-19223][connectors] Simplify Availability
Future Model in Base Connector
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4ea95782b4c6a2538153d4d16ad3f4839c7de0fb
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 23:48:01 2020 +0200
[FLINK-19223][connectors] Simplify Availability Future Model in Base Connector
This implements a model closer to the AvailabilityListener and AvailabilityHelper in the flink-runtime.
This closes #13385
---
.../SingleThreadMultiplexSourceReaderBase.java | 5 +-
.../base/source/reader/SourceReaderBase.java | 21 +-
.../reader/fetcher/SingleThreadFetcherManager.java | 4 +-
.../source/reader/fetcher/SplitFetcherManager.java | 5 +-
.../FutureCompletingBlockingQueue.java | 259 +++++++++++++++++----
.../reader/synchronization/FutureNotifier.java | 66 ------
.../base/source/reader/SourceReaderBaseTest.java | 13 +-
.../source/reader/fetcher/SplitFetcherTest.java | 36 ++-
.../base/source/reader/mocks/MockBaseSource.java | 5 +-
.../base/source/reader/mocks/MockSourceReader.java | 6 +-
.../FutureCompletingBlockingQueueTest.java | 32 ++-
.../reader/synchronization/FutureNotifierTest.java | 131 -----------
12 files changed, 261 insertions(+), 322 deletions(-)
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index 3239f28..ab87db0 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
import java.util.function.Supplier;
@@ -40,16 +39,14 @@ public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends
extends SourceReaderBase<E, T, SplitT, SplitStateT> {
public SingleThreadMultiplexSourceReaderBase(
- FutureNotifier futureNotifier,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
super(
- futureNotifier,
elementsQueue,
- new SingleThreadFetcherManager<>(futureNotifier, elementsQueue, splitReaderSupplier),
+ new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier),
recordEmitter,
config,
context);
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 0305e2d..fb4e6df9 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -29,7 +29,6 @@ import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
import org.apache.flink.core.io.InputStatus;
import org.slf4j.Logger;
@@ -61,9 +60,6 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
implements SourceReader<T, SplitT> {
private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBase.class);
- /** A future notifier to notify when this reader requires attention. */
- private final FutureNotifier futureNotifier;
-
/** A queue to buffer the elements fetched by the fetcher thread. */
private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
@@ -94,13 +90,11 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
private boolean noMoreSplitsAssignment;
public SourceReaderBase(
- FutureNotifier futureNotifier,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
- this.futureNotifier = futureNotifier;
this.elementsQueue = elementsQueue;
this.splitFetcherManager = splitFetcherManager;
this.recordEmitter = recordEmitter;
@@ -203,18 +197,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
@Override
public CompletableFuture<Void> isAvailable() {
- // The order matters here. We first get the future. After this point, if the queue
- // is empty or there is no error in the split fetcher manager, we can ensure that
- // the future will be completed by the fetcher once it put an element into the element queue,
- // or it will be completed when an error occurs.
- CompletableFuture<Void> future = futureNotifier.future();
- splitFetcherManager.checkErrors();
- if (!elementsQueue.isEmpty()) {
- // The fetcher got the new elements after the last poll, or their is a finished split.
- // Simply complete the future and return;
- futureNotifier.notifyComplete();
- }
- return future;
+ return currentFetch != null ? FutureCompletingBlockingQueue.AVAILABLE : elementsQueue.getAvailabilityFuture();
}
@Override
@@ -239,7 +222,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
if (sourceEvent instanceof NoMoreSplitsEvent) {
LOG.info("Reader received NoMoreSplits event.");
noMoreSplitsAssignment = true;
- futureNotifier.notifyComplete();
+ elementsQueue.notifyAvailable();
}
}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
index bd5879f..339c533 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
import java.util.List;
import java.util.function.Supplier;
@@ -34,10 +33,9 @@ public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
extends SplitFetcherManager<E, SplitT> {
public SingleThreadFetcherManager(
- FutureNotifier futureNotifier,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
- super(futureNotifier, elementsQueue, splitReaderSupplier);
+ super(elementsQueue, splitReaderSupplier);
}
@Override
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 26d92e3..ffac523 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -23,7 +23,6 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
import org.apache.flink.util.ThrowableCatchingRunnable;
import org.slf4j.Logger;
@@ -79,12 +78,10 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
/**
* Create a split fetcher manager.
*
- * @param futureNotifier a notifier to notify the complete of a future.
* @param elementsQueue the queue that split readers will put elements into.
* @param splitReaderFactory a supplier that could be used to create split readers.
*/
public SplitFetcherManager(
- FutureNotifier futureNotifier,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Supplier<SplitReader<E, SplitT>> splitReaderFactory) {
this.elementsQueue = elementsQueue;
@@ -96,7 +93,7 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
// Add the exception to the exception list.
uncaughtFetcherException.get().addSuppressed(t);
// Wake up the main thread to let it know the exception.
- futureNotifier.notifyComplete();
+ elementsQueue.notifyAvailable();
}
}
};
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index dcbb66e..c89b682 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -18,62 +18,174 @@
package org.apache.flink.connector.base.source.reader.synchronization;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
/**
- * A custom implementation of blocking queue with the following features.
- * <ul>
- * <li>
- * It allows a consuming thread to be notified asynchronously on element availability when the
- * queue is empty.
- * </li>
- * <li>
- * Allows the putting threads to be gracefully waken up without interruption.
- * </li>
- * </ul>
+ * A custom implementation of blocking queue in combination with a {@link CompletableFuture} that is
+ * used in the hand-over of data from a producing thread to a consuming thread.
+ * This FutureCompletingBlockingQueue has the following features:
+ *
+ * <h3>Consumer Notifications</h3>
+ *
+ * <p>Rather than letting consumers block on the {@link #take()} method, or have them poll the
+ * {@link #poll()} method, this queue offers a {@link CompletableFuture}, obtained via the
+ * {@link #getAvailabilityFuture()} method) that gets completed whenever the queue is non-empty.
+ * A consumer can thus subscribe to asynchronous notifications for availability by adding a handler
+ * to the obtained {@code CompletableFuture}.
+ *
+ * <p>The future may also be completed by an explicit call to {@link #notifyAvailable()}. That way the
+ * consumer may be notified of a situation/condition without adding an element to the queue.
+ *
+ * <p>Availability is reset when a call to {@link #poll()} (or {@link #take()} finds an empty queue
+ * or results in an empty queue (takes the last element).
+ *
+ * <p>Note that this model generally assumes that <i>false positives</i> are okay, meaning that the
+ * availability future completes despite there being no data availabile in the queue. The consumer is
+ * responsible for polling data and obtaining another future to wait on. This is similar to the way
+ * that Java's Monitors and Conditions can have the <i>spurious wakeup</i> of the waiting threads
+ * and commonly need to be used in loop with the waiting condition.
+ *
+ * <h3>Producer Wakeup</h3>
+ *
+ * <p>The queue supports gracefully waking up producing threads that are blocked due to the queue
+ * capacity limits, without interrupting the thread. This is done via the {@link #wakeUpPuttingThread(int)}
+ * method.
*
* @param <T> the type of the elements in the queue.
*/
public class FutureCompletingBlockingQueue<T> {
+
+ /**
+ * A constant future that is complete, indicating availability. Using this constant in cases that
+ * are guaranteed available helps short-circuiting some checks and avoiding volatile memory operations.
+ */
+ public static final CompletableFuture<Void> AVAILABLE = getAvailableFuture();
+
+ /**
+ * The default capacity for the queue.
+ */
+ private static final int DEFAULT_CAPACITY = 1;
+
+ // ------------------------------------------------------------------------
+
+ /** The maximum capacity of the queue. */
private final int capacity;
- private final FutureNotifier futureNotifier;
- /** The element queue. */
- private final Queue<T> queue;
+ /** The availability future. This doubles as a "non empty" condition. This value is never null.*/
+ private CompletableFuture<Void> currentFuture;
+
/** The lock for synchronization. */
private final Lock lock;
+
+ /** The element queue. */
+ @GuardedBy("lock")
+ private final Queue<T> queue;
+
/** The per-thread conditions that are waiting on putting elements. */
+ @GuardedBy("lock")
private final Queue<Condition> notFull;
- /** The shared conditions for getting elements. */
- private final Condition notEmpty;
+
/** The per-thread conditions and wakeUp flags. */
+ @GuardedBy("lock")
private ConditionAndFlag[] putConditionAndFlags;
- /**
- * The default capacity for the queue.
- */
- private static final Integer DEFAULT_CAPACITY = 1;
-
- public FutureCompletingBlockingQueue(FutureNotifier futureNotifier) {
- this(futureNotifier, DEFAULT_CAPACITY);
+ public FutureCompletingBlockingQueue() {
+ this(DEFAULT_CAPACITY);
}
- public FutureCompletingBlockingQueue(FutureNotifier futureNotifier, int capacity) {
+ public FutureCompletingBlockingQueue(int capacity) {
+ checkArgument(capacity > 0, "capacity must be > 0");
this.capacity = capacity;
- this.futureNotifier = futureNotifier;
this.queue = new ArrayDeque<>(capacity);
this.lock = new ReentrantLock();
this.putConditionAndFlags = new ConditionAndFlag[1];
this.notFull = new ArrayDeque<>();
- this.notEmpty = lock.newCondition();
+
+ // initially the queue is empty and thus unavailable
+ this.currentFuture = new CompletableFuture<>();
+ }
+
+ // ------------------------------------------------------------------------
+ // Future / Notification logic
+ // ------------------------------------------------------------------------
+
+ /**
+ * Returns the availability future. If the queue is non-empty, then this future will already
+ * be complete. Otherwise the obtained future is guaranteed to get completed the next time
+ * the queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}.
+ *
+ * <p>It is important that a completed future is no guarantee that the next call to
+ * {@link #poll()} will return a non-null element. If there are concurrent consumer, another
+ * consumer may have taken the available element. Or there was no element in the first place,
+ * because the future was completed through a call to {@link #notifyAvailable()}.
+ *
+ * <p>For that reason, it is important to call this method (to obtain a new future) every
+ * time again after {@link #poll()} returned null and you want to wait for data.
+ */
+ public CompletableFuture<Void> getAvailabilityFuture() {
+ return currentFuture;
+ }
+
+ /**
+ * Makes sure the availability future is complete, if it is not complete already.
+ * All futures returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to
+ * be completed.
+ *
+ * <p>All future calls to the method will return a completed future, until the point
+ * that the availability is reset via calls to {@link #poll()} that leave the queue empty.
+ */
+ public void notifyAvailable() {
+ lock.lock();
+ try {
+ moveToAvailable();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Internal utility to make sure that the current future futures are complete (until reset).
+ */
+ @GuardedBy("lock")
+ private void moveToAvailable() {
+ final CompletableFuture<Void> current = currentFuture;
+ if (current != AVAILABLE) {
+ currentFuture = AVAILABLE;
+ current.complete(null);
+ }
}
/**
+ * Makes sure the availability future is incomplete, if it was complete before.
+ */
+ @GuardedBy("lock")
+ private void moveToUnAvailable() {
+ if (currentFuture == AVAILABLE) {
+ currentFuture = new CompletableFuture<>();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Blocking Queue Logic
+ // ------------------------------------------------------------------------
+
+ /**
* Put an element into the queue. The thread blocks if the queue is full.
*
* @param threadIndex the index of the thread.
@@ -101,25 +213,40 @@ public class FutureCompletingBlockingQueue<T> {
}
/**
- * Get and remove the first element from the queue. The call blocks if the queue is empty.
+ * <b>Warning:</b> This is a dangerous method and should only be used for testing convenience.
+ * A method that blocks until availability does not go together well with the concept of
+ * asynchronous notifications and non-blocking polling.
+ *
+ * <p>Get and remove the first element from the queue. The call blocks if the queue is empty.
+ * The problem with this method is that it may loop internally until an element is available and
+ * that way eagerly reset the availability future. If a consumer thread is blocked in taking an
+ * element, it will receive availability notifications from {@link #notifyAvailable()} and immediately
+ * reset them by calling {@link #poll()} and finding the queue empty.
*
* @return the first element in the queue.
* @throws InterruptedException when the thread is interrupted.
*/
- public T take() throws InterruptedException{
- lock.lock();
- try {
- while (queue.size() == 0) {
- notEmpty.await();
+ @VisibleForTesting
+ public T take() throws InterruptedException {
+ T next;
+ while ((next = poll()) == null) {
+ // use the future to wait for availability to avoid busy waiting
+ try {
+ getAvailabilityFuture().get();
+ } catch (ExecutionException | CompletionException e) {
+ // this should never happen, but we propagate just in case
+ throw new FlinkRuntimeException("exception in queue future completion", e);
}
- return dequeue();
- } finally {
- lock.unlock();
}
+ return next;
}
/**
- * Get and remove the first element from the queue. Null is retuned if the queue is empty.
+ * Get and remove the first element from the queue. Null is returned if the queue is empty.
+ * If this makes the queue empty (takes the last element) or finds the queue already empty,
+ * then this resets the availability notifications. The next call to {@link #getAvailabilityFuture()}
+ * will then return a non-complete future that completes only the next time that the queue
+ * becomes non-empty or the {@link #notifyAvailable()} method is called.
*
* @return the first element from the queue, or Null if the queue is empty.
*/
@@ -127,6 +254,7 @@ public class FutureCompletingBlockingQueue<T> {
lock.lock();
try {
if (queue.size() == 0) {
+ moveToUnAvailable();
return null;
}
return dequeue();
@@ -149,6 +277,9 @@ public class FutureCompletingBlockingQueue<T> {
}
}
+ /**
+ * Gets the size of the queue.
+ */
public int size() {
lock.lock();
try {
@@ -158,6 +289,9 @@ public class FutureCompletingBlockingQueue<T> {
}
}
+ /**
+ * Checks whether the queue is empty.
+ */
public boolean isEmpty() {
lock.lock();
try {
@@ -167,6 +301,10 @@ public class FutureCompletingBlockingQueue<T> {
}
}
+ /**
+ * Checks the remaining capacity in the queue. That is the difference between the maximum capacity
+ * and the current number of elements in the queue.
+ */
public int remainingCapacity() {
lock.lock();
try {
@@ -176,6 +314,16 @@ public class FutureCompletingBlockingQueue<T> {
}
}
+ /**
+ * Gracefully wakes up the thread with the given {@code threadIndex} if it is blocked in
+ * adding an element. to the queue. If the thread is blocked in {@link #put(int, Object)} it will
+ * immediately return from the method with a return value of false.
+ *
+ * <p>If this method is called, the next time the thread with the given index is about to be blocked
+ * in adding an element, it may immediately wake up and return.
+ *
+ * @param threadIndex The number identifying the thread.
+ */
public void wakeUpPuttingThread(int threadIndex) {
lock.lock();
try {
@@ -190,36 +338,34 @@ public class FutureCompletingBlockingQueue<T> {
}
}
- public void notifyAvailable() {
- futureNotifier.notifyComplete();
- }
-
// --------------- private helpers -------------------------
+ @GuardedBy("lock")
private void enqueue(T element) {
- int sizeBefore = queue.size();
+ final int sizeBefore = queue.size();
queue.add(element);
- futureNotifier.notifyComplete();
if (sizeBefore == 0) {
- notEmpty.signal();
+ moveToAvailable();
}
if (sizeBefore < capacity - 1 && !notFull.isEmpty()) {
signalNextPutter();
}
}
+ @GuardedBy("lock")
private T dequeue() {
- int sizeBefore = queue.size();
- T element = queue.poll();
+ final int sizeBefore = queue.size();
+ final T element = queue.poll();
if (sizeBefore == capacity && !notFull.isEmpty()) {
signalNextPutter();
}
- if (sizeBefore > 1) {
- notEmpty.signal();
+ if (queue.isEmpty()) {
+ moveToUnAvailable();
}
return element;
}
+ @GuardedBy("lock")
private void waitOnPut(int fetcherIndex) throws InterruptedException {
maybeCreateCondition(fetcherIndex);
Condition cond = putConditionAndFlags[fetcherIndex].condition();
@@ -227,12 +373,14 @@ public class FutureCompletingBlockingQueue<T> {
cond.await();
}
+ @GuardedBy("lock")
private void signalNextPutter() {
if (!notFull.isEmpty()) {
notFull.poll().signal();
}
}
+ @GuardedBy("lock")
private void maybeCreateCondition(int threadIndex) {
if (putConditionAndFlags.length < threadIndex + 1) {
putConditionAndFlags = Arrays.copyOf(putConditionAndFlags, threadIndex + 1);
@@ -243,6 +391,7 @@ public class FutureCompletingBlockingQueue<T> {
}
}
+ @GuardedBy("lock")
private boolean getAndResetWakeUpFlag(int threadIndex) {
maybeCreateCondition(threadIndex);
if (putConditionAndFlags[threadIndex].getWakeUp()) {
@@ -275,4 +424,22 @@ public class FutureCompletingBlockingQueue<T> {
wakeUp = value;
}
}
+
+ // ------------------------------------------------------------------------
+ // utilities
+ // ------------------------------------------------------------------------
+
+ @SuppressWarnings("unchecked")
+ private static CompletableFuture<Void> getAvailableFuture() {
+ // this is a way to obtain the AvailabilityProvider.AVAILABLE future until we decide to
+ // move the class from the runtime module to the core module
+ try {
+ final Class<?> clazz = Class.forName("org.apache.flink.runtime.io.AvailabilityProvider");
+ final Field field = clazz.getDeclaredField("AVAILABLE");
+ return (CompletableFuture<Void>) field.get(null);
+ }
+ catch (Throwable t) {
+ return CompletableFuture.completedFuture(null);
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java
deleted file mode 100644
index 9330407..0000000
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.source.reader.synchronization;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * A class facilitating the asynchronous communication among threads.
- */
-public class FutureNotifier {
- /** A future reference. */
- private final AtomicReference<CompletableFuture<Void>> futureRef;
-
- public FutureNotifier() {
- this.futureRef = new AtomicReference<>(null);
- }
-
- /**
- * Get the future out of this notifier. The future will be completed when someone invokes
- * {@link #notifyComplete()}. If there is already an uncompleted future, that existing
- * future will be returned instead of a new one.
- *
- * @return a future that will be completed when {@link #notifyComplete()} is invoked.
- */
- public CompletableFuture<Void> future() {
- CompletableFuture<Void> prevFuture = futureRef.get();
- if (prevFuture != null) {
- // Someone has created a future for us, don't create a new one.
- return prevFuture;
- } else {
- CompletableFuture<Void> newFuture = new CompletableFuture<>();
- boolean newFutureSet = futureRef.compareAndSet(null, newFuture);
- // If someone created a future after our previous check, use that future.
- // Otherwise, use the new future.
- return newFutureSet ? newFuture : future();
- }
- }
-
- /**
- * Complete the future if there is one. This will release the thread that is waiting for data.
- */
- public void notifyComplete() {
- CompletableFuture<Void> future = futureRef.get();
- // If there are multiple threads trying to complete the future, only the first one succeeds.
- if (future != null && future.complete(null)) {
- futureRef.compareAndSet(future, null);
- }
- }
-}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 0ec4297..84eeb4e 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
import org.junit.Rule;
import org.junit.Test;
@@ -62,12 +61,10 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
expectedException.expectMessage("One or more fetchers have encountered exception");
final String errMsg = "Testing Exception";
- FutureNotifier futureNotifier = new FutureNotifier();
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
- new FutureCompletingBlockingQueue<>(futureNotifier);
+ new FutureCompletingBlockingQueue<>();
// We have to handle split changes first, otherwise fetch will not be called.
try (MockSourceReader reader = new MockSourceReader(
- futureNotifier,
elementsQueue,
() -> new SplitReader<int[], MockSourceSplit>() {
@Override
@@ -127,13 +124,11 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
@Override
protected MockSourceReader createReader() {
- FutureNotifier futureNotifier = new FutureNotifier();
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
- new FutureCompletingBlockingQueue<>(futureNotifier);
+ new FutureCompletingBlockingQueue<>();
MockSplitReader mockSplitReader =
new MockSplitReader(2, true, true);
return new MockSourceReader(
- futureNotifier,
elementsQueue,
() -> mockSplitReader,
getConfig(),
@@ -183,12 +178,10 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
final String splitId,
final RecordsWithSplitIds<E> records) throws Exception {
- final FutureNotifier futureNotifier = new FutureNotifier();
final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue =
- new FutureCompletingBlockingQueue<>(futureNotifier);
+ new FutureCompletingBlockingQueue<>();
final SourceReader<E, TestingSourceSplit> reader = new SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, TestingSourceSplit>(
- futureNotifier,
elementsQueue,
() -> new TestingSplitReader<E, TestingSourceSplit>(records),
new PassThroughRecordEmitter<E, TestingSourceSplit>(),
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 6e27d95..c25490b 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
import org.apache.flink.core.testutils.CheckedThread;
import org.junit.Test;
@@ -84,28 +83,28 @@ public class SplitFetcherTest {
@Test
public void testNotifiesWhenGoingIdle() {
- final FutureNotifier notifier = new FutureNotifier();
+ final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = new FutureCompletingBlockingQueue<>();
final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
"test-split",
- new FutureCompletingBlockingQueue<>(notifier),
+ queue,
new TestingSplitReader<>(finishedSplitFetch("test-split")));
fetcher.runOnce();
assertTrue(fetcher.assignedSplits().isEmpty());
assertTrue(fetcher.isIdle());
- assertTrue(notifier.future().isDone());
+ assertTrue(queue.getAvailabilityFuture().isDone());
}
@Test
public void testNotifiesOlderFutureWhenGoingIdle() {
- final FutureNotifier notifier = new FutureNotifier();
+ final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = new FutureCompletingBlockingQueue<>();
final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
- "test-split",
- new FutureCompletingBlockingQueue<>(notifier),
- new TestingSplitReader<>(finishedSplitFetch("test-split")));
+ "test-split",
+ queue,
+ new TestingSplitReader<>(finishedSplitFetch("test-split")));
- final CompletableFuture<?> future = notifier.future();
+ final CompletableFuture<?> future = queue.getAvailabilityFuture();
fetcher.runOnce();
@@ -116,9 +115,8 @@ public class SplitFetcherTest {
@Test
public void testNotifiesWhenGoingIdleConcurrent() throws Exception {
- final FutureNotifier notifier = new FutureNotifier();
final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
- new FutureCompletingBlockingQueue<>(notifier);
+ new FutureCompletingBlockingQueue<>();
final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
"test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
@@ -128,7 +126,7 @@ public class SplitFetcherTest {
try {
fetcher.runOnce();
- assertTrue(notifier.future().isDone());
+ assertTrue(queue.getAvailabilityFuture().isDone());
} finally {
queueDrainer.shutdown();
}
@@ -136,16 +134,15 @@ public class SplitFetcherTest {
@Test
public void testNotifiesOlderFutureWhenGoingIdleConcurrent() throws Exception {
- final FutureNotifier notifier = new FutureNotifier();
final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
- new FutureCompletingBlockingQueue<>(notifier);
+ new FutureCompletingBlockingQueue<>();
final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
- "test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
+ "test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue);
queueDrainer.start();
- final CompletableFuture<?> future = notifier.future();
+ final CompletableFuture<?> future = queue.getAvailabilityFuture();
try {
fetcher.runOnce();
@@ -164,7 +161,7 @@ public class SplitFetcherTest {
final int numTotalRecords = numRecordsPerSplit * numSplits;
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue =
- new FutureCompletingBlockingQueue<>(new FutureNotifier(), 1);
+ new FutureCompletingBlockingQueue<>(1);
SplitFetcher<int[], MockSourceSplit> fetcher =
new SplitFetcher<>(
0,
@@ -243,7 +240,7 @@ public class SplitFetcherTest {
private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
final SplitReader<E, TestingSourceSplit> reader) {
- return createFetcher(reader, new FutureCompletingBlockingQueue<>(new FutureNotifier()));
+ return createFetcher(reader, new FutureCompletingBlockingQueue<>());
}
private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
@@ -255,7 +252,7 @@ public class SplitFetcherTest {
private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(
final String splitId,
final SplitReader<E, TestingSourceSplit> reader) {
- return createFetcherWithSplit(splitId, new FutureCompletingBlockingQueue<>(new FutureNotifier()), reader);
+ return createFetcherWithSplit(splitId, new FutureCompletingBlockingQueue<>(), reader);
}
private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(
@@ -292,6 +289,7 @@ public class SplitFetcherTest {
queue.take();
}
catch (InterruptedException ignored) {
+ Thread.currentThread().interrupt();
// fall through the loop
}
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
index ae46286..2681e5a 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
@@ -30,7 +30,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.InstantiationUtil;
@@ -68,15 +67,13 @@ public class MockBaseSource implements Source<Integer, MockSourceSplit, List<Moc
@Override
public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
- FutureNotifier futureNotifier = new FutureNotifier();
FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
- new FutureCompletingBlockingQueue<>(futureNotifier);
+ new FutureCompletingBlockingQueue<>();
Configuration config = new Configuration();
config.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L);
return new MockSourceReader(
- futureNotifier,
elementsQueue,
() -> new MockSplitReader(2, true, true),
config,
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
index 92a19ef..66022db 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
@@ -25,7 +25,6 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,12 +36,11 @@ import java.util.function.Supplier;
public class MockSourceReader
extends SingleThreadMultiplexSourceReaderBase<int[], Integer, MockSourceSplit, AtomicInteger> {
- public MockSourceReader(FutureNotifier futureNotifier,
- FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue,
+ public MockSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue,
Supplier<SplitReader<int[], MockSourceSplit>> splitFetcherSupplier,
Configuration config,
SourceReaderContext context) {
- super(futureNotifier, elementsQueue, splitFetcherSupplier, new MockRecordEmitter(), config, context);
+ super(elementsQueue, splitFetcherSupplier, new MockRecordEmitter(), config, context);
}
@Override
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
index c1bde50..ef056d9e 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.connector.base.source.reader.synchronization;
+import org.apache.flink.runtime.io.AvailabilityProvider;
+
import org.junit.Test;
import java.util.ArrayList;
@@ -30,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -42,10 +45,9 @@ public class FutureCompletingBlockingQueueTest {
@Test
public void testBasics() throws InterruptedException {
- FutureNotifier futureNotifier = new FutureNotifier();
- FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5);
+ FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(5);
- CompletableFuture<Void> future = futureNotifier.future();
+ CompletableFuture<Void> future = queue.getAvailabilityFuture();
assertTrue(queue.isEmpty());
assertEquals(0, queue.size());
@@ -66,8 +68,7 @@ public class FutureCompletingBlockingQueueTest {
@Test
public void testPoll() throws InterruptedException {
- FutureNotifier futureNotifier = new FutureNotifier();
- FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier);
+ FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>();
queue.put(0, 1234);
Integer value = queue.poll();
assertNotNull(value);
@@ -76,8 +77,7 @@ public class FutureCompletingBlockingQueueTest {
@Test
public void testWakeUpPut() throws InterruptedException {
- FutureNotifier futureNotifier = new FutureNotifier();
- FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 1);
+ FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(1);
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
@@ -97,8 +97,7 @@ public class FutureCompletingBlockingQueueTest {
@Test
public void testConcurrency() throws InterruptedException {
- FutureNotifier futureNotifier = new FutureNotifier();
- FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5);
+ FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(5);
final int numValuesPerThread = 10000;
final int numPuttingThreads = 5;
List<Thread> threads = new ArrayList<>();
@@ -146,12 +145,21 @@ public class FutureCompletingBlockingQueueTest {
@Test
public void testFutureCompletingBlockingQueueConstructor() {
- FutureNotifier notifier = new FutureNotifier();
- FutureCompletingBlockingQueue<Object> defaultCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(notifier);
- FutureCompletingBlockingQueue<Object> specifiedCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(notifier, SPECIFIED_CAPACITY);
+ FutureCompletingBlockingQueue<Object> defaultCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>();
+ FutureCompletingBlockingQueue<Object> specifiedCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(SPECIFIED_CAPACITY);
// The capacity of the queue needs to be equal to 10000
assertEquals(defaultCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) DEFAULT_CAPACITY);
// The capacity of the queue needs to be equal to SPECIFIED_CAPACITY
assertEquals(specifiedCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) SPECIFIED_CAPACITY);
}
+
+ /**
+ * This test is to guard that our reflection is not broken and we do not lose the
+ * performance advantage. This is possible, because the tests depend on the runtime modules
+ * while the main scope does not.
+ */
+ @Test
+ public void testQueueUsesShortCircuitFuture() {
+ assertSame(AvailabilityProvider.AVAILABLE, FutureCompletingBlockingQueue.AVAILABLE);
+ }
}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java
deleted file mode 100644
index b257ebf..0000000
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.source.reader.synchronization;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * The unit tests for {@link FutureNotifier}.
- */
-public class FutureNotifierTest {
-
- @Test
- public void testGetFuture() {
- FutureNotifier notifier = new FutureNotifier();
- CompletableFuture<Void> future = notifier.future();
- // The future should not be null.
- assertNotNull(future);
- // Calling the future again should return the same future.
- assertEquals(future, notifier.future());
- }
-
- @Test
- public void testCompleteFuture() {
- FutureNotifier notifier = new FutureNotifier();
- CompletableFuture<Void> future = notifier.future();
- assertFalse(future.isDone());
- notifier.notifyComplete();
- assertTrue(future.isDone());
- }
-
- @Test
- public void testConcurrency() throws InterruptedException, ExecutionException {
- final int times = 1_000_000;
- final int nThreads = 5;
- FutureNotifier notifier = new FutureNotifier();
- // A thread pool that simply gets futures out of the notifier.
- ExecutorService listenerExecutor = Executors.newFixedThreadPool(nThreads);
- // A thread pool that completes the futures.
- ExecutorService notifierExecutor = Executors.newFixedThreadPool(nThreads);
-
- CountDownLatch runningListeners = new CountDownLatch(nThreads);
- CountDownLatch startCommand = new CountDownLatch(1);
- CountDownLatch finishLine = new CountDownLatch(1);
-
- List<Future<?>> executionFutures = new ArrayList<>();
- // Start nThreads thread getting futures out of the notifier.
- for (int i = 0; i < nThreads; i++) {
- executionFutures.add(listenerExecutor.submit(() -> {
- try {
- List<CompletableFuture<Void>> futures = new ArrayList<>(times);
- startCommand.await();
- for (int j = 0; j < times; j++) {
- futures.add(notifier.future());
- }
- runningListeners.countDown();
- // Wait for the notifying thread to finish.
- finishLine.await();
- // All the futures should have been completed.
- futures.forEach(f -> {
- assertNotNull(f);
- assertTrue(f.isDone());
- });
- } catch (Exception e) {
- fail();
- }
- }));
- }
-
- // Start nThreads thread notifying the completion.
- for (int i = 0; i < nThreads; i++) {
- notifierExecutor.submit(() -> {
- try {
- startCommand.await();
- while (runningListeners.getCount() > 0) {
- notifier.notifyComplete();
- }
- notifier.notifyComplete();
- finishLine.countDown();
- } catch (Exception e) {
- fail();
- }
- });
- }
-
- // Kick off the threads.
- startCommand.countDown();
-
- try {
- for (Future<?> executionFuture : executionFutures) {
- executionFuture.get();
- }
- } finally {
- listenerExecutor.shutdown();
- notifierExecutor.shutdown();
- listenerExecutor.awaitTermination(30L, TimeUnit.SECONDS);
- notifierExecutor.awaitTermination(30L, TimeUnit.SECONDS);
- }
- }
-}