You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/09/15 20:13:48 UTC

[flink] branch master updated (3df63de -> c60aaff)

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 3df63de  [FLINK-19152] Remove Kafka 0.10.x and 0.11.x connectors
     new 070aacb  [hotfix][hadoop] Minor code cleanups in HadoopFileStatus
     new 5bcdd7d  [FLINK-19218][core] Remove inconsistent/misleading locality information from Local File Splits.
     new 7375589  [FLINK-19221][core][hadoop] Introduce the LocatedFileStatus to save block location RPC requests.
     new a5b0d32  [FLINK-17393][connectors] Wakeup the SplitFetchers more elegantly.
     new d762576  [FLINK-19225][connectors] Various small improvements to SourceReaderBase
     new 3b2f54b  [hotfix][tests] Move constants in SplitFetcherTest relevant to only one test into test method
     new a820646  [refactor][core] Eagerly initialize the FetchTask to support proper unit testing
     new 5118570  [FLINK-18128][connectors] Ensure idle split fetchers lead to availability notifications.
     new 4ea9578  [FLINK-19223][connectors] Simplify Availability Future Model in Base Connector
     new cef8a58  [FLINK-19245][connectors] Set default capacity for FutureCompletingBlockingQueue.
     new c60aaff  [hotfix][tests] Extend test coverage for FutureCompletingBlockingQueue.

The 11 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../SingleThreadMultiplexSourceReaderBase.java     |   5 +-
 .../base/source/reader/SourceReaderBase.java       |  55 ++-
 .../base/source/reader/SourceReaderOptions.java    |   2 +-
 .../base/source/reader/fetcher/FetchTask.java      |  30 +-
 .../reader/fetcher/SingleThreadFetcherManager.java |   4 +-
 .../base/source/reader/fetcher/SplitFetcher.java   |  52 ++-
 .../source/reader/fetcher/SplitFetcherManager.java |  11 +-
 .../FutureCompletingBlockingQueue.java             | 443 ++++++++++++++++++---
 .../reader/synchronization/FutureNotifier.java     |  66 ---
 .../base/source/reader/SourceReaderBaseTest.java   |  13 +-
 .../source/reader/fetcher/SplitFetcherTest.java    | 218 +++++++++-
 .../base/source/reader/mocks/MockBaseSource.java   |   5 +-
 .../base/source/reader/mocks/MockSourceReader.java |   6 +-
 .../source/reader/mocks/TestingSplitReader.java    |  13 +-
 .../FutureCompletingBlockingQueueTest.java         | 214 +++++++++-
 .../reader/synchronization/FutureNotifierTest.java | 131 ------
 .../{BlockLocation.java => LocatedFileStatus.java} |  36 +-
 .../flink/core/fs/local/LocalBlockLocation.java    |  16 +-
 .../flink/core/fs/local/LocalFileStatus.java       |  22 +-
 .../flink/core/fs/local/LocalFileSystem.java       |  25 +-
 .../java/org/apache/flink/util/StringUtils.java    |   6 +
 .../fs/anotherdummy/AnotherDummyFSFileSystem.java  |   4 +-
 .../apache/flink/fs/dummy/DummyFSFileSystem.java   |   4 +-
 .../flink/runtime/fs/hdfs/HadoopBlockLocation.java |  55 ++-
 .../flink/runtime/fs/hdfs/HadoopFileStatus.java    |  37 +-
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java    |  18 +-
 .../runtime/fs/hdfs/LocatedHadoopFileStatus.java   |  48 +++
 27 files changed, 1052 insertions(+), 487 deletions(-)
 delete mode 100644 flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java
 delete mode 100644 flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java
 copy flink-core/src/main/java/org/apache/flink/core/fs/{BlockLocation.java => LocatedFileStatus.java} (51%)
 create mode 100644 flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/LocatedHadoopFileStatus.java


[flink] 06/11: [hotfix][tests] Move constants in SplitFetcherTest relevant to only one test into test method

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3b2f54bcb437f98e6137c904045cc51072b5c06b
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 19:47:40 2020 +0200

    [hotfix][tests] Move constants in SplitFetcherTest relevant to only one test into test method
---
 .../source/reader/fetcher/SplitFetcherTest.java    | 27 +++++++++++-----------
 1 file changed, 14 insertions(+), 13 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index eef8328..4fa99dd 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -41,13 +41,14 @@ import static org.junit.Assert.assertTrue;
  * Unit test for {@link SplitFetcher}.
  */
 public class SplitFetcherTest {
-	private static final int NUM_SPLITS = 3;
-	private static final int NUM_RECORDS_PER_SPLIT = 10_000;
-	private static final int INTERRUPT_RECORDS_INTERVAL = 10;
-	private static final int NUM_TOTAL_RECORDS = NUM_RECORDS_PER_SPLIT * NUM_SPLITS;
 
 	@Test
 	public void testWakeup() throws InterruptedException {
+		final int numSplits = 3;
+		final int numRecordsPerSplit = 10_000;
+		final int interruptRecordsInterval = 10;
+		final int numTotalRecords = numRecordsPerSplit * numSplits;
+
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue =
 			new FutureCompletingBlockingQueue<>(new FutureNotifier(), 1);
 		SplitFetcher<int[], MockSourceSplit> fetcher =
@@ -59,10 +60,10 @@ public class SplitFetcherTest {
 
 		// Prepare the splits.
 		List<MockSourceSplit> splits = new ArrayList<>();
-		for (int i = 0; i < NUM_SPLITS; i++) {
-			splits.add(new MockSourceSplit(i, 0, NUM_RECORDS_PER_SPLIT));
-			int base = i * NUM_RECORDS_PER_SPLIT;
-			for (int j = base; j < base + NUM_RECORDS_PER_SPLIT; j++) {
+		for (int i = 0; i < numSplits; i++) {
+			splits.add(new MockSourceSplit(i, 0, numRecordsPerSplit));
+			int base = i * numRecordsPerSplit;
+			for (int j = base; j < base + numRecordsPerSplit; j++) {
 				splits.get(splits.size() - 1).addRecord(j);
 			}
 		}
@@ -81,9 +82,9 @@ public class SplitFetcherTest {
 			@Override
 			public void run() {
 				int lastInterrupt = 0;
-				while (recordsRead.size() < NUM_TOTAL_RECORDS && !stop.get()) {
+				while (recordsRead.size() < numTotalRecords && !stop.get()) {
 					int numRecordsRead = recordsRead.size();
-					if (numRecordsRead >= lastInterrupt + INTERRUPT_RECORDS_INTERVAL) {
+					if (numRecordsRead >= lastInterrupt + interruptRecordsInterval) {
 						fetcher.wakeUp(false);
 						wakeupTimes.incrementAndGet();
 						lastInterrupt = numRecordsRead;
@@ -96,7 +97,7 @@ public class SplitFetcherTest {
 			fetcherThread.start();
 			interrupter.start();
 
-			while (recordsRead.size() < NUM_SPLITS * NUM_RECORDS_PER_SPLIT) {
+			while (recordsRead.size() < numSplits * numRecordsPerSplit) {
 				final RecordsWithSplitIds<int[]> nextBatch = elementQueue.take();
 				while (nextBatch.nextSplit() != null) {
 					int[] arr;
@@ -106,9 +107,9 @@ public class SplitFetcherTest {
 				}
 			}
 
-			assertEquals(NUM_TOTAL_RECORDS, recordsRead.size());
+			assertEquals(numTotalRecords, recordsRead.size());
 			assertEquals(0, (int) recordsRead.first());
-			assertEquals(NUM_TOTAL_RECORDS - 1, (int) recordsRead.last());
+			assertEquals(numTotalRecords - 1, (int) recordsRead.last());
 			assertTrue(wakeupTimes.get() > 0);
 		} finally {
 			stop.set(true);


[flink] 02/11: [FLINK-19218][core] Remove inconsistent/misleading locality information from Local File Splits.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5bcdd7de67bf21bc19c8b5b84f617dacaf7f0c99
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 16:28:53 2020 +0200

    [FLINK-19218][core] Remove inconsistent/misleading locality information from Local File Splits.
---
 .../flink/core/fs/local/LocalBlockLocation.java      | 16 ++++++++--------
 .../apache/flink/core/fs/local/LocalFileSystem.java  | 20 +-------------------
 .../main/java/org/apache/flink/util/StringUtils.java |  6 ++++++
 3 files changed, 15 insertions(+), 27 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
index 25dd92d..512bc5b 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalBlockLocation.java
@@ -20,27 +20,27 @@ package org.apache.flink.core.fs.local;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.BlockLocation;
-
-import java.io.IOException;
+import org.apache.flink.util.StringUtils;
 
 /**
  * Implementation of the {@link BlockLocation} interface for a local file system.
+ *
+ * <p>Local files have only one block that represents the entire file.
+ * The block has no location information, because it is not accessible where the files (or their block)
+ * actually reside, especially in cases where the files are on a mounted file system.
  */
 @Internal
 public class LocalBlockLocation implements BlockLocation {
 
 	private final long length;
 
-	private final String[] hosts;
-
-	public LocalBlockLocation(final String host, final long length) {
-		this.hosts = new String[] { host };
+	public LocalBlockLocation(final long length) {
 		this.length = length;
 	}
 
 	@Override
-	public String[] getHosts() throws IOException {
-		return this.hosts;
+	public String[] getHosts() {
+		return StringUtils.EMPTY_STRING_ARRAY;
 	}
 
 	@Override
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 32ae50b..154fc2c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -35,15 +35,10 @@ import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.OperatingSystem;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.net.InetAddress;
 import java.net.URI;
-import java.net.UnknownHostException;
 import java.nio.file.AccessDeniedException;
 import java.nio.file.DirectoryNotEmptyException;
 import java.nio.file.FileAlreadyExistsException;
@@ -61,8 +56,6 @@ import static org.apache.flink.util.Preconditions.checkState;
 @Internal
 public class LocalFileSystem extends FileSystem {
 
-	private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystem.class);
-
 	/** The URI representing the local file system. */
 	private static final URI LOCAL_URI = OperatingSystem.isWindows() ? URI.create("file:/") : URI.create("file:///");
 
@@ -77,23 +70,12 @@ public class LocalFileSystem extends FileSystem {
 	 * Because Paths are not immutable, we cannot cache the proper path here. */
 	private final URI homeDir;
 
-	/** The host name of this machine. */
-	private final String hostName;
-
 	/**
 	 * Constructs a new <code>LocalFileSystem</code> object.
 	 */
 	public LocalFileSystem() {
 		this.workingDir = new File(System.getProperty("user.dir")).toURI();
 		this.homeDir = new File(System.getProperty("user.home")).toURI();
-
-		String tmp = "unknownHost";
-		try {
-			tmp = InetAddress.getLocalHost().getHostName();
-		} catch (UnknownHostException e) {
-			LOG.error("Could not resolve local host", e);
-		}
-		this.hostName = tmp;
 	}
 
 	// ------------------------------------------------------------------------
@@ -101,7 +83,7 @@ public class LocalFileSystem extends FileSystem {
 	@Override
 	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
 		return new BlockLocation[] {
-				new LocalBlockLocation(hostName, file.getLen())
+				new LocalBlockLocation(file.getLen())
 		};
 	}
 
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index fa591d3..a23bc14 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -41,6 +41,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @PublicEvolving
 public final class StringUtils {
 
+	/**
+	 * An empty string array. There are just too many places where one needs an empty string array
+	 * and wants to save some object allocation.
+	 */
+	public static final String[] EMPTY_STRING_ARRAY = new String[0];
+
 	private static final char[] HEX_CHARS = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
 
 	/**


[flink] 07/11: [refactor][core] Eagerly initialize the FetchTask to support proper unit testing

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8206467af0830dcb89623ea068b5ca3b3450c92
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 20:51:56 2020 +0200

    [refactor][core] Eagerly initialize the FetchTask to support proper unit testing
    
    Previously, the FetchTask was constructed lazily in the run() method, which gets in the
    way of unit testing via the runOnce() method.
---
 .../base/source/reader/fetcher/SplitFetcher.java        | 17 +++++++++--------
 1 file changed, 9 insertions(+), 8 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index fa1442e..289dc34 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -75,20 +75,21 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 		this.isIdle = true;
 		this.wakeUp = new AtomicBoolean(false);
 		this.closed = new AtomicBoolean(false);
+
+		this.fetchTask = new FetchTask<>(
+				splitReader,
+				elementsQueue,
+				ids -> {
+					ids.forEach(assignedSplits::remove);
+					updateIsIdle();
+				},
+				id);
 	}
 
 	@Override
 	public void run() {
 		LOG.info("Starting split fetcher {}", id);
 		try {
-			// Remove the split from the assignments if it is already done.
-			this.fetchTask = new FetchTask<>(
-					splitReader,
-					elementsQueue,
-					ids -> {
-						ids.forEach(assignedSplits::remove);
-						updateIsIdle();
-					}, id);
 			while (!closed.get()) {
 				runOnce();
 			}


[flink] 01/11: [hotfix][hadoop] Minor code cleanups in HadoopFileStatus

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 070aacb6a9c0e4cdef50b220e78f6f1c8b134155
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 15:58:25 2020 +0200

    [hotfix][hadoop] Minor code cleanups in HadoopFileStatus
---
 .../flink/runtime/fs/hdfs/HadoopBlockLocation.java | 55 +++++++++++-----------
 .../flink/runtime/fs/hdfs/HadoopFileStatus.java    | 20 +++-----
 2 files changed, 34 insertions(+), 41 deletions(-)

diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
index 1484c95..2880bae 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopBlockLocation.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Implementation of the {@link BlockLocation} interface for the
  * Hadoop Distributed File System.
@@ -57,19 +59,16 @@ public final class HadoopBlockLocation implements BlockLocation {
 	 *        the original HDFS block location
 	 */
 	public HadoopBlockLocation(final org.apache.hadoop.fs.BlockLocation blockLocation) {
-
-		this.blockLocation = blockLocation;
+		this.blockLocation = checkNotNull(blockLocation, "blockLocation");
 	}
 
 	@Override
 	public String[] getHosts() throws IOException {
 
-		/**
-		 * Unfortunately, the Hadoop API is not precise about if the list returned by BlockLocation.getHosts() contains
-		 * the hostnames with their respective domain suffix or not (FQDN or not). We have witnessed both versions,
-		 * depending on the cluster's network configuration. As a workaround, we therefore strip every hostname to make
-		 * sure it does not contain the domain suffix.
-		 */
+		// Unfortunately, the Hadoop API is not precise about if the list returned by BlockLocation.getHosts() contains
+		// the hostnames with their respective domain suffix or not (FQDN or not). We have witnessed both versions,
+		//depending on the cluster's network configuration. As a workaround, we therefore strip every hostname to make
+		//sure it does not contain the domain suffix.
 		if (this.hostnames == null) {
 
 			final String[] hadoopHostnames = blockLocation.getHosts();
@@ -83,6 +82,26 @@ public final class HadoopBlockLocation implements BlockLocation {
 		return this.hostnames;
 	}
 
+	@Override
+	public long getLength() {
+		return this.blockLocation.getLength();
+	}
+
+	@Override
+	public long getOffset() {
+		return this.blockLocation.getOffset();
+	}
+
+	@Override
+	public int compareTo(final BlockLocation o) {
+		final long diff = getOffset() - o.getOffset();
+		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Looks for a domain suffix in a FQDN and strips it if present.
 	 *
@@ -110,24 +129,4 @@ public final class HadoopBlockLocation implements BlockLocation {
 
 		return originalHostname.substring(0, index);
 	}
-
-	@Override
-	public long getLength() {
-
-		return this.blockLocation.getLength();
-	}
-
-	@Override
-	public long getOffset() {
-
-		return this.blockLocation.getOffset();
-	}
-
-	@Override
-	public int compareTo(final BlockLocation o) {
-
-		final long diff = getOffset() - o.getOffset();
-
-		return diff < 0 ? -1 : diff > 0 ? 1 : 0;
-	}
 }
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
index 08d31de..2346d92 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
@@ -27,7 +27,7 @@ import org.apache.flink.core.fs.Path;
  */
 public final class HadoopFileStatus implements FileStatus {
 
-	private org.apache.hadoop.fs.FileStatus fileStatus;
+	private final org.apache.hadoop.fs.FileStatus fileStatus;
 
 	/**
 	 * Creates a new file status from an HDFS file status.
@@ -46,12 +46,7 @@ public final class HadoopFileStatus implements FileStatus {
 
 	@Override
 	public long getBlockSize() {
-		long blocksize = fileStatus.getBlockSize();
-		if (blocksize > fileStatus.getLen()) {
-			return fileStatus.getLen();
-		}
-
-		return blocksize;
+		return Math.min(fileStatus.getBlockSize(), fileStatus.getLen());
 	}
 
 	@Override
@@ -69,18 +64,17 @@ public final class HadoopFileStatus implements FileStatus {
 		return fileStatus.getReplication();
 	}
 
-	public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {
-		return this.fileStatus;
-	}
-
 	@Override
 	public Path getPath() {
 		return new Path(fileStatus.getPath().toUri());
 	}
 
-	@SuppressWarnings("deprecation")
 	@Override
 	public boolean isDir() {
-		return fileStatus.isDir();
+		return fileStatus.isDirectory();
+	}
+
+	public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {
+		return this.fileStatus;
 	}
 }


[flink] 08/11: [FLINK-18128][connectors] Ensure idle split fetchers lead to availability notifications.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 511857049ba30c8ff0ee56da551fa4a479dc583e
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 20:55:57 2020 +0200

    [FLINK-18128][connectors] Ensure idle split fetchers lead to availability notifications.
---
 .../base/source/reader/fetcher/SplitFetcher.java   |  23 ++-
 .../FutureCompletingBlockingQueue.java             |   4 +
 .../source/reader/fetcher/SplitFetcherTest.java    | 185 +++++++++++++++++++++
 3 files changed, 206 insertions(+), 6 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 289dc34..3beb0da 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -57,6 +57,10 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 	private final AtomicBoolean closed;
 	private FetchTask<E, SplitT> fetchTask;
 	private volatile SplitFetcherTask runningTask = null;
+
+	/** Flag whether this fetcher has no work assigned at the moment.
+	 * Fetcher that have work (a split) assigned but are currently blocked (for example enqueueing
+	 * a fetch and hitting the element queue limit) are NOT considered idle. */
 	private volatile boolean isIdle;
 
 	SplitFetcher(
@@ -81,7 +85,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 				elementsQueue,
 				ids -> {
 					ids.forEach(assignedSplits::remove);
-					updateIsIdle();
+					checkAndSetIdle();
 				},
 				id);
 	}
@@ -168,7 +172,7 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 	 */
 	public void addSplits(List<SplitT> splitsToAdd) {
 		maybeEnqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, splitChanges, assignedSplits));
-		updateIsIdle();
+		isIdle = false; // in case we were idle before
 		wakeUp(true);
 	}
 
@@ -292,6 +296,17 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 
 	}
 
+	private void checkAndSetIdle() {
+		final boolean nowIdle = assignedSplits.isEmpty() && taskQueue.isEmpty() && splitChanges.isEmpty();
+		if (nowIdle) {
+			isIdle = true;
+
+			// because the method might get invoked past the point when the source reader last checked
+			// the elements queue, we need to notify availability in the case when we become idle
+			elementsQueue.notifyAvailable();
+		}
+	}
+
 	//--------------------- Helper class ------------------
 
 	private static class DummySplitFetcherTask implements SplitFetcherTask {
@@ -316,8 +331,4 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 			return name;
 		}
 	}
-
-	private void updateIsIdle() {
-		isIdle = taskQueue.isEmpty() && splitChanges.isEmpty() && assignedSplits.isEmpty();
-	}
 }
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index ea0f030..dcbb66e 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -190,6 +190,10 @@ public class FutureCompletingBlockingQueue<T> {
 		}
 	}
 
+	public void notifyAvailable() {
+		futureNotifier.notifyComplete();
+	}
+
 	// --------------- private helpers -------------------------
 
 	private void enqueue(T element) {
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 4fa99dd..6e27d95 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -19,10 +19,15 @@
 package org.apache.flink.connector.base.source.reader.fetcher;
 
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
+import org.apache.flink.connector.base.source.reader.RecordsBySplits;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
+import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
+import org.apache.flink.core.testutils.CheckedThread;
 
 import org.junit.Test;
 
@@ -31,10 +36,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -43,6 +50,113 @@ import static org.junit.Assert.assertTrue;
 public class SplitFetcherTest {
 
 	@Test
+	public void testNewFetcherIsIdle() {
+		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcher(new TestingSplitReader<>());
+		assertTrue(fetcher.isIdle());
+	}
+
+	@Test
+	public void testFetcherNotIdleAfterSplitAdded() {
+		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcher(new TestingSplitReader<>());
+		final TestingSourceSplit split = new TestingSourceSplit("test-split");
+
+		fetcher.addSplits(Collections.singletonList(split));
+
+		assertFalse(fetcher.isIdle());
+
+		// need to loop here because the internal wakeup flag handling means we need multiple loops
+		while (fetcher.assignedSplits().isEmpty()) {
+			fetcher.runOnce();
+			assertFalse(fetcher.isIdle());
+		}
+	}
+
+	@Test
+	public void testIdleAfterFinishedSplitsEnqueued() {
+		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+			"test-split", new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+		fetcher.runOnce();
+
+		assertTrue(fetcher.assignedSplits().isEmpty());
+		assertTrue(fetcher.isIdle());
+	}
+
+	@Test
+	public void testNotifiesWhenGoingIdle() {
+		final FutureNotifier notifier = new FutureNotifier();
+		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+			"test-split",
+			new FutureCompletingBlockingQueue<>(notifier),
+			new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+		fetcher.runOnce();
+
+		assertTrue(fetcher.assignedSplits().isEmpty());
+		assertTrue(fetcher.isIdle());
+		assertTrue(notifier.future().isDone());
+	}
+
+	@Test
+	public void testNotifiesOlderFutureWhenGoingIdle() {
+		final FutureNotifier notifier = new FutureNotifier();
+		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+			"test-split",
+			new FutureCompletingBlockingQueue<>(notifier),
+			new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+		final CompletableFuture<?> future = notifier.future();
+
+		fetcher.runOnce();
+
+		assertTrue(fetcher.assignedSplits().isEmpty());
+		assertTrue(fetcher.isIdle());
+		assertTrue(future.isDone());
+	}
+
+	@Test
+	public void testNotifiesWhenGoingIdleConcurrent() throws Exception {
+		final FutureNotifier notifier = new FutureNotifier();
+		final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
+				new FutureCompletingBlockingQueue<>(notifier);
+		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+			"test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+		final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue);
+		queueDrainer.start();
+
+		try {
+			fetcher.runOnce();
+
+			assertTrue(notifier.future().isDone());
+		} finally {
+			queueDrainer.shutdown();
+		}
+	}
+
+	@Test
+	public void testNotifiesOlderFutureWhenGoingIdleConcurrent() throws Exception {
+		final FutureNotifier notifier = new FutureNotifier();
+		final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
+			new FutureCompletingBlockingQueue<>(notifier);
+		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
+			"test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
+
+		final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue);
+		queueDrainer.start();
+
+		final CompletableFuture<?> future = notifier.future();
+
+		try {
+			fetcher.runOnce();
+
+			assertTrue(future.isDone());
+		} finally {
+			queueDrainer.shutdown();
+		}
+	}
+
+	@Test
 	public void testWakeup() throws InterruptedException {
 		final int numSplits = 3;
 		final int numRecordsPerSplit = 10_000;
@@ -118,4 +232,75 @@ public class SplitFetcherTest {
 			interrupter.join();
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  testing utils
+	// ------------------------------------------------------------------------
+
+	private static <E> RecordsBySplits<E> finishedSplitFetch(String splitId) {
+		return new RecordsBySplits<>(Collections.emptyMap(), Collections.singleton(splitId));
+	}
+
+	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
+			final SplitReader<E, TestingSourceSplit> reader) {
+		return createFetcher(reader, new FutureCompletingBlockingQueue<>(new FutureNotifier()));
+	}
+
+	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
+			final SplitReader<E, TestingSourceSplit> reader,
+			final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue) {
+		return new SplitFetcher<>(0, queue, reader, () -> {});
+	}
+
+	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(
+			final String splitId,
+			final SplitReader<E, TestingSourceSplit> reader) {
+		return createFetcherWithSplit(splitId, new FutureCompletingBlockingQueue<>(new FutureNotifier()), reader);
+	}
+
+	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(
+			final String splitId,
+			final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> queue,
+			final SplitReader<E, TestingSourceSplit> reader) {
+
+		final SplitFetcher<E, TestingSourceSplit> fetcher = createFetcher(reader, queue);
+
+		fetcher.addSplits(Collections.singletonList(new TestingSourceSplit(splitId)));
+		while (fetcher.assignedSplits().isEmpty()) {
+			fetcher.runOnce();
+		}
+		return fetcher;
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class QueueDrainerThread extends CheckedThread {
+
+		private final FutureCompletingBlockingQueue<?> queue;
+		private volatile boolean running = true;
+
+		QueueDrainerThread(FutureCompletingBlockingQueue<?> queue) {
+			super("Queue Drainer");
+			setPriority(Thread.MAX_PRIORITY);
+			this.queue = queue;
+		}
+
+		@Override
+		public void go() throws Exception {
+			while (running) {
+				try {
+					queue.take();
+				}
+				catch (InterruptedException ignored) {
+					// fall through the loop
+				}
+			}
+		}
+
+		public void shutdown() throws Exception {
+			running = false;
+			interrupt();
+			sync();
+		}
+	}
 }


[flink] 03/11: [FLINK-19221][core][hadoop] Introduce the LocatedFileStatus to save block location RPC requests.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7375589cc17d801af1306c3bb7563fc539b91d01
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 17:33:49 2020 +0200

    [FLINK-19221][core][hadoop] Introduce the LocatedFileStatus to save block location RPC requests.
    
    When the HDFS Client returns a FileStatus (description of a file) it frequently returns a
    'LocatedFileStatus' which already contains all the BlockLocation information.
    
    We here expose this on the Flink side, to save RPC calls to the Name Node. For example file sources often
    request block locations for all files (to facilitate locality aware assignments), currently resulting in
    one RPC call to the Name Node for each file.
    
    When the FileStatus obtained from listing the directory (or getting details for a file) already has
    the block locations, we can save the extra RPC call per file to obtain that block location information.
    
    This closes #13394
---
 .../apache/flink/core/fs/LocatedFileStatus.java    | 43 +++++++++++++++++++
 .../flink/core/fs/local/LocalFileStatus.java       | 22 ++++++++--
 .../flink/core/fs/local/LocalFileSystem.java       |  7 ++--
 .../fs/anotherdummy/AnotherDummyFSFileSystem.java  |  4 +-
 .../apache/flink/fs/dummy/DummyFSFileSystem.java   |  4 +-
 .../flink/runtime/fs/hdfs/HadoopFileStatus.java    | 17 +++++++-
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java    | 18 +++++---
 .../runtime/fs/hdfs/LocatedHadoopFileStatus.java   | 48 ++++++++++++++++++++++
 8 files changed, 144 insertions(+), 19 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/LocatedFileStatus.java b/flink-core/src/main/java/org/apache/flink/core/fs/LocatedFileStatus.java
new file mode 100644
index 0000000..f391b19
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/LocatedFileStatus.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.Public;
+
+/**
+ * A {@code LocatedFileStatus} is a {@link FileStatus} that contains additionally the location information
+ * of the file directly. The information is accessible through the {@link #getBlockLocations()} ()} method.
+ *
+ * <p>This class eagerly communicates the block information (including locations) when that information
+ * is readily (or cheaply) available. That way users can avoid an additional call to
+ * {@link FileSystem#getFileBlockLocations(FileStatus, long, long)}, which is an additional RPC call for
+ * each file.
+ */
+@Public
+public interface LocatedFileStatus extends FileStatus {
+
+	/**
+	 * Gets the location information for the file. The location is per block, because each block may
+	 * live potentially at a different location.
+	 *
+	 * <p>Files without location information typically expose one block with no host information
+	 * for that block.
+	 */
+	BlockLocation[] getBlockLocations();
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
index 781e0d3..1b34c4f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileStatus.java
@@ -19,8 +19,10 @@
 package org.apache.flink.core.fs.local;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.LocatedFileStatus;
 import org.apache.flink.core.fs.Path;
 
 import java.io.File;
@@ -30,7 +32,7 @@ import java.io.File;
  * for the local file system.
  */
 @Internal
-public class LocalFileStatus implements FileStatus {
+public class LocalFileStatus implements LocatedFileStatus {
 
 	/**
 	 * The file this file status belongs to.
@@ -43,6 +45,11 @@ public class LocalFileStatus implements FileStatus {
 	private final Path path;
 
 	/**
+	 * Cached length field, to avoid repeated native/syscalls.
+	 */
+	private final long len;
+
+	/**
 	 * Creates a <code>LocalFileStatus</code> object from a given {@link File} object.
 	 *
 	 * @param f
@@ -53,6 +60,7 @@ public class LocalFileStatus implements FileStatus {
 	public LocalFileStatus(final File f, final FileSystem fs) {
 		this.file = f;
 		this.path = new Path(fs.getUri().getScheme() + ":" + f.toURI().getPath());
+		this.len = f.length();
 	}
 
 	@Override
@@ -62,12 +70,12 @@ public class LocalFileStatus implements FileStatus {
 
 	@Override
 	public long getBlockSize() {
-		return this.file.length();
+		return this.len;
 	}
 
 	@Override
 	public long getLen() {
-		return this.file.length();
+		return this.len;
 	}
 
 	@Override
@@ -90,6 +98,14 @@ public class LocalFileStatus implements FileStatus {
 		return this.path;
 	}
 
+	@Override
+	public BlockLocation[] getBlockLocations() {
+		// we construct this lazily here and don't cache it, because it is used only rarely
+		return new BlockLocation[] {
+			new LocalBlockLocation(len)
+		};
+	}
+
 	public File getFile() {
 		return this.file;
 	}
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
index 154fc2c..ae7beed 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalFileSystem.java
@@ -82,9 +82,10 @@ public class LocalFileSystem extends FileSystem {
 
 	@Override
 	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
-		return new BlockLocation[] {
-				new LocalBlockLocation(file.getLen())
-		};
+		if (file instanceof LocalFileStatus) {
+			return ((LocalFileStatus) file).getBlockLocations();
+		}
+		throw new IOException("File status does not belong to the LocalFileSystem: " + file);
 	}
 
 	@Override
diff --git a/flink-end-to-end-tests/flink-plugins-test/another-dummy-fs/src/main/java/org/apache/flink/fs/anotherdummy/AnotherDummyFSFileSystem.java b/flink-end-to-end-tests/flink-plugins-test/another-dummy-fs/src/main/java/org/apache/flink/fs/anotherdummy/AnotherDummyFSFileSystem.java
index 99a5b47..31ad157 100644
--- a/flink-end-to-end-tests/flink-plugins-test/another-dummy-fs/src/main/java/org/apache/flink/fs/anotherdummy/AnotherDummyFSFileSystem.java
+++ b/flink-end-to-end-tests/flink-plugins-test/another-dummy-fs/src/main/java/org/apache/flink/fs/anotherdummy/AnotherDummyFSFileSystem.java
@@ -45,8 +45,6 @@ class AnotherDummyFSFileSystem extends FileSystem {
 
 	static final URI FS_URI = URI.create("anotherDummy:///");
 
-	private static final String HOSTNAME = "localhost";
-
 	private final URI workingDir;
 
 	private final URI homeDir;
@@ -93,7 +91,7 @@ class AnotherDummyFSFileSystem extends FileSystem {
 	@Override
 	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
 		return new BlockLocation[] {
-			new LocalBlockLocation(HOSTNAME, file.getLen())
+			new LocalBlockLocation(file.getLen())
 		};
 	}
 
diff --git a/flink-end-to-end-tests/flink-plugins-test/dummy-fs/src/main/java/org/apache/flink/fs/dummy/DummyFSFileSystem.java b/flink-end-to-end-tests/flink-plugins-test/dummy-fs/src/main/java/org/apache/flink/fs/dummy/DummyFSFileSystem.java
index ac11807..236d0d5 100644
--- a/flink-end-to-end-tests/flink-plugins-test/dummy-fs/src/main/java/org/apache/flink/fs/dummy/DummyFSFileSystem.java
+++ b/flink-end-to-end-tests/flink-plugins-test/dummy-fs/src/main/java/org/apache/flink/fs/dummy/DummyFSFileSystem.java
@@ -45,8 +45,6 @@ class DummyFSFileSystem extends FileSystem {
 
 	static final URI FS_URI = URI.create("dummy:///");
 
-	private static final String HOSTNAME = "localhost";
-
 	private final URI workingDir;
 
 	private final URI homeDir;
@@ -93,7 +91,7 @@ class DummyFSFileSystem extends FileSystem {
 	@Override
 	public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
 		return new BlockLocation[] {
-			new LocalBlockLocation(HOSTNAME, file.getLen())
+			new LocalBlockLocation(file.getLen())
 		};
 	}
 
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
index 2346d92..2d21498 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileStatus.java
@@ -23,9 +23,9 @@ import org.apache.flink.core.fs.Path;
 
 /**
  * Concrete implementation of the {@link FileStatus} interface for the
- * Hadoop Distribution File System.
+ * Hadoop Distributed File System.
  */
-public final class HadoopFileStatus implements FileStatus {
+public class HadoopFileStatus implements FileStatus {
 
 	private final org.apache.hadoop.fs.FileStatus fileStatus;
 
@@ -77,4 +77,17 @@ public final class HadoopFileStatus implements FileStatus {
 	public org.apache.hadoop.fs.FileStatus getInternalFileStatus() {
 		return this.fileStatus;
 	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new {@code HadoopFileStatus} from Hadoop's {@link org.apache.hadoop.fs.FileStatus}.
+	 * If Hadoop's file status is <i>located</i>, i.e., it contains block information, then this method
+	 * returns an implementation of Flink's {@link org.apache.flink.core.fs.LocatedFileStatus}.
+	 */
+	public static HadoopFileStatus fromHadoopStatus(final org.apache.hadoop.fs.FileStatus fileStatus) {
+		return fileStatus instanceof org.apache.hadoop.fs.LocatedFileStatus
+				? new LocatedHadoopFileStatus((org.apache.hadoop.fs.LocatedFileStatus) fileStatus)
+				: new HadoopFileStatus(fileStatus);
+	}
 }
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 1135e01..84f4ec7 100644
--- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -83,7 +83,7 @@ public class HadoopFileSystem extends FileSystem {
 	@Override
 	public FileStatus getFileStatus(final Path f) throws IOException {
 		org.apache.hadoop.fs.FileStatus status = this.fs.getFileStatus(toHadoopPath(f));
-		return new HadoopFileStatus(status);
+		return HadoopFileStatus.fromHadoopStatus(status);
 	}
 
 	@Override
@@ -93,10 +93,18 @@ public class HadoopFileSystem extends FileSystem {
 			throw new IOException("file is not an instance of DistributedFileStatus");
 		}
 
-		final HadoopFileStatus f = (HadoopFileStatus) file;
+		// shortcut - if the status already has the information, return it.
+		if (file instanceof LocatedHadoopFileStatus) {
+			return ((LocatedHadoopFileStatus) file).getBlockLocations();
+		}
+
+		final org.apache.hadoop.fs.FileStatus hadoopStatus = ((HadoopFileStatus) file).getInternalFileStatus();
 
-		final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs.getFileBlockLocations(f.getInternalFileStatus(),
-			start, len);
+		// second shortcut - if the internal status already has the information, return it.
+		// only if that is not the case, to the actual HDFS call (RPC to Name Node)
+		final org.apache.hadoop.fs.BlockLocation[] blkLocations = hadoopStatus instanceof org.apache.hadoop.fs.LocatedFileStatus
+				? ((org.apache.hadoop.fs.LocatedFileStatus) hadoopStatus).getBlockLocations()
+				: fs.getFileBlockLocations(hadoopStatus, start, len);
 
 		// Wrap up HDFS specific block location objects
 		final HadoopBlockLocation[] distBlkLocations = new HadoopBlockLocation[blkLocations.length];
@@ -159,7 +167,7 @@ public class HadoopFileSystem extends FileSystem {
 
 		// Convert types
 		for (int i = 0; i < files.length; i++) {
-			files[i] = new HadoopFileStatus(hadoopFiles[i]);
+			files[i] = HadoopFileStatus.fromHadoopStatus(hadoopFiles[i]);
 		}
 
 		return files;
diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/LocatedHadoopFileStatus.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/LocatedHadoopFileStatus.java
new file mode 100644
index 0000000..5ee1f6a
--- /dev/null
+++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/LocatedHadoopFileStatus.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.LocatedFileStatus;
+
+/**
+ * Concrete implementation of the {@link LocatedFileStatus} interface for the
+ * Hadoop Distributed File System.
+ */
+public final class LocatedHadoopFileStatus extends HadoopFileStatus implements LocatedFileStatus {
+
+	/**
+	 * Creates a new located file status from an HDFS file status.
+	 */
+	public LocatedHadoopFileStatus(org.apache.hadoop.fs.LocatedFileStatus fileStatus) {
+		super(fileStatus);
+	}
+
+	@Override
+	public BlockLocation[] getBlockLocations() {
+		final org.apache.hadoop.fs.BlockLocation[] hadoopLocations =
+				((org.apache.hadoop.fs.LocatedFileStatus) getInternalFileStatus()).getBlockLocations();
+
+		final HadoopBlockLocation[] locations = new HadoopBlockLocation[hadoopLocations.length];
+		for (int i = 0; i < locations.length; i++) {
+			locations[i] = new HadoopBlockLocation(hadoopLocations[i]);
+		}
+		return locations;
+	}
+}


[flink] 04/11: [FLINK-17393][connectors] Wakeup the SplitFetchers more elegantly.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a5b0d3297748c1be47ad579a88f24df2255a8df1
Author: Jiangjie (Becket) Qin <ji...@alibaba-inc.com>
AuthorDate: Mon Sep 14 23:53:21 2020 +0200

    [FLINK-17393][connectors] Wakeup the SplitFetchers more elegantly.
    
    This closes #13366
---
 .../base/source/reader/SourceReaderBase.java       |   3 +-
 .../base/source/reader/fetcher/FetchTask.java      |  30 +--
 .../base/source/reader/fetcher/SplitFetcher.java   |  26 +-
 .../source/reader/fetcher/SplitFetcherManager.java |   3 +-
 .../FutureCompletingBlockingQueue.java             | 268 +++++++++++++++++----
 .../source/reader/fetcher/SplitFetcherTest.java    |   8 +-
 .../source/reader/mocks/TestingSplitReader.java    |  13 +-
 .../FutureCompletingBlockingQueueTest.java         | 117 ++++++++-
 8 files changed, 378 insertions(+), 90 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 02b7a7c..979afb2 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -43,7 +43,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkState;
@@ -66,7 +65,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	private final FutureNotifier futureNotifier;
 
 	/** A queue to buffer the elements fetched by the fetcher thread. */
-	private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+	private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
 
 	/** The state of the splits. */
 	private final Map<String, SplitContext<T, SplitStateT>> splitStates;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
index 30835ce..530add1 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/FetchTask.java
@@ -21,10 +21,10 @@ package org.apache.flink.connector.base.source.reader.fetcher;
 import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
 import java.io.IOException;
 import java.util.Collection;
-import java.util.concurrent.BlockingQueue;
 import java.util.function.Consumer;
 
 /**
@@ -32,22 +32,22 @@ import java.util.function.Consumer;
  */
 class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 	private final SplitReader<E, SplitT> splitReader;
-	private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+	private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
 	private final Consumer<Collection<String>> splitFinishedCallback;
-	private final Thread runningThread;
+	private final int fetcherIndex;
 	private volatile RecordsWithSplitIds<E> lastRecords;
 	private volatile boolean wakeup;
 
 	FetchTask(
-		SplitReader<E, SplitT> splitReader,
-		BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
-		Consumer<Collection<String>> splitFinishedCallback,
-		Thread runningThread) {
+			SplitReader<E, SplitT> splitReader,
+			FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+			Consumer<Collection<String>> splitFinishedCallback,
+			int fetcherIndex) {
 		this.splitReader = splitReader;
 		this.elementsQueue = elementsQueue;
 		this.splitFinishedCallback = splitFinishedCallback;
 		this.lastRecords = null;
-		this.runningThread = runningThread;
+		this.fetcherIndex = fetcherIndex;
 		this.wakeup = false;
 	}
 
@@ -61,10 +61,11 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 			if (!isWakenUp()) {
 				// The order matters here. We must first put the last records into the queue.
 				// This ensures the handling of the fetched records is atomic to wakeup.
-				elementsQueue.put(lastRecords);
-				// The callback does not throw InterruptedException.
-				splitFinishedCallback.accept(lastRecords.finishedSplits());
-				lastRecords = null;
+				if (elementsQueue.put(fetcherIndex, lastRecords)) {
+					// The callback does not throw InterruptedException.
+					splitFinishedCallback.accept(lastRecords.finishedSplits());
+					lastRecords = null;
+				}
 			}
 		} finally {
 			// clean up the potential wakeup effect. It is possible that the fetcher is waken up
@@ -72,7 +73,6 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 			// running thread will be interrupted. The next invocation of run() will see that and
 			// just skip.
 			if (isWakenUp()) {
-				Thread.interrupted();
 				wakeup = false;
 			}
 		}
@@ -93,12 +93,12 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 			splitReader.wakeUp();
 		} else {
 			// The task might be blocking on enqueuing the records, just interrupt.
-			runningThread.interrupt();
+			elementsQueue.wakeUpPuttingThread(fetcherIndex);
 		}
 	}
 
 	private boolean isWakenUp() {
-		return wakeup || runningThread.isInterrupted();
+		return wakeup;
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
index 35deeba..fa1442e 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcher.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,7 +34,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.BlockingDeque;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -50,21 +50,20 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 	private final Map<String, SplitT> assignedSplits;
 	/** The current split assignments for this fetcher. */
 	private final Queue<SplitsChange<SplitT>> splitChanges;
-	private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+	private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
 	private final SplitReader<E, SplitT> splitReader;
 	private final Runnable shutdownHook;
 	private final AtomicBoolean wakeUp;
 	private final AtomicBoolean closed;
 	private FetchTask<E, SplitT> fetchTask;
-	private volatile Thread runningThread;
 	private volatile SplitFetcherTask runningTask = null;
 	private volatile boolean isIdle;
 
 	SplitFetcher(
-		int id,
-		BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
-		SplitReader<E, SplitT> splitReader,
-		Runnable shutdownHook) {
+			int id,
+			FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
+			SplitReader<E, SplitT> splitReader,
+			Runnable shutdownHook) {
 
 		this.id = id;
 		this.taskQueue = new LinkedBlockingDeque<>();
@@ -83,14 +82,13 @@ public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
 		LOG.info("Starting split fetcher {}", id);
 		try {
 			// Remove the split from the assignments if it is already done.
-			runningThread = Thread.currentThread();
 			this.fetchTask = new FetchTask<>(
-				splitReader,
-				elementsQueue,
-				ids -> {
-					ids.forEach(this::removeAssignedSplit);
-					updateIsIdle();
-				}, runningThread);
+					splitReader,
+					elementsQueue,
+					ids -> {
+						ids.forEach(assignedSplits::remove);
+						updateIsIdle();
+					}, id);
 			while (!closed.get()) {
 				runOnce();
 			}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 61bada1..822a9a9 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -32,7 +32,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -66,7 +65,7 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
 	private final AtomicReference<Throwable> uncaughtFetcherException;
 
 	/** The element queue that the split fetchers will put elements into. */
-	private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
+	private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
 
 	/** A map keeping track of all the split fetchers. */
 	protected final Map<Integer, SplitFetcher<E, SplitT>> fetchers;
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index de51af1..ea0f030 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -18,83 +18,257 @@
 
 package org.apache.flink.connector.base.source.reader.synchronization;
 
-import java.util.Collection;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Queue;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
- * A BlockingQueue that allows a consuming thread to be notified asynchronously on element
- * availability when the queue is empty.
- *
- * <p>Implementation wise, it is a subclass of {@link LinkedBlockingQueue} that ensures all
- * the methods adding elements into the queue will complete the elements availability future.
- *
- * <p>The overriding methods must first put the elements into the queue then check and complete
- * the future if needed. This is required to ensure the thread waiting for more messages will
- * not lose a notification.
+ * A custom implementation of blocking queue with the following features.
+ * <ul>
+ *     <li>
+ *         It allows a consuming thread to be notified asynchronously on element availability when the
+ *         queue is empty.
+ *     </li>
+ *     <li>
+ *         Allows the putting threads to be gracefully waken up without interruption.
+ *     </li>
+ * </ul>
  *
  * @param <T> the type of the elements in the queue.
  */
-public class FutureCompletingBlockingQueue<T> extends LinkedBlockingQueue<T> {
-
+public class FutureCompletingBlockingQueue<T> {
+	private final int capacity;
 	private final FutureNotifier futureNotifier;
+
+	/** The element queue. */
+	private final Queue<T> queue;
+	/** The lock for synchronization. */
+	private final Lock lock;
+	/** The per-thread conditions that are waiting on putting elements. */
+	private final Queue<Condition> notFull;
+	/** The shared conditions for getting elements. */
+	private final Condition notEmpty;
+	/** The per-thread conditions and wakeUp flags. */
+	private ConditionAndFlag[] putConditionAndFlags;
+
 	/**
-	 * The default capacity for {@link LinkedBlockingQueue}.
+	 * The default capacity for the queue.
 	 */
-	private static final Integer DEFAULT_CAPACITY = 10000;
+	private static final Integer DEFAULT_CAPACITY = 1;
 
 	public FutureCompletingBlockingQueue(FutureNotifier futureNotifier) {
 		this(futureNotifier, DEFAULT_CAPACITY);
 	}
 
 	public FutureCompletingBlockingQueue(FutureNotifier futureNotifier, int capacity) {
-		super(capacity);
+		this.capacity = capacity;
 		this.futureNotifier = futureNotifier;
+		this.queue = new ArrayDeque<>(capacity);
+		this.lock = new ReentrantLock();
+		this.putConditionAndFlags = new ConditionAndFlag[1];
+		this.notFull = new ArrayDeque<>();
+		this.notEmpty = lock.newCondition();
+	}
+
+	/**
+	 * Put an element into the queue. The thread blocks if the queue is full.
+	 *
+	 * @param threadIndex the index of the thread.
+	 * @param element the element to put.
+	 * @return true if the element has been successfully put into the queue, false otherwise.
+	 * @throws InterruptedException when the thread is interrupted.
+	 */
+	public boolean put(int threadIndex, T element) throws InterruptedException {
+		if (element == null) {
+			throw new NullPointerException();
+		}
+		lock.lockInterruptibly();
+		try {
+			while (queue.size() >= capacity) {
+				if (getAndResetWakeUpFlag(threadIndex)) {
+					return false;
+				}
+				waitOnPut(threadIndex);
+			}
+			enqueue(element);
+			return true;
+		} finally {
+			lock.unlock();
+		}
 	}
 
-	@Override
-	public void put(T t) throws InterruptedException {
-		super.put(t);
+	/**
+	 * Get and remove the first element from the queue. The call blocks if the queue is empty.
+	 *
+	 * @return the first element in the queue.
+	 * @throws InterruptedException when the thread is interrupted.
+	 */
+	public T take() throws InterruptedException{
+		lock.lock();
+		try {
+			while (queue.size() == 0) {
+				notEmpty.await();
+			}
+			return dequeue();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Get and remove the first element from the queue. Null is retuned if the queue is empty.
+	 *
+	 * @return the first element from the queue, or Null if the queue is empty.
+	 */
+	public T poll() {
+		lock.lock();
+		try {
+			if (queue.size() == 0) {
+				return null;
+			}
+			return dequeue();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Get the first element from the queue without removing it.
+	 *
+	 * @return the first element in the queue, or Null if the queue is empty.
+	 */
+	public T peek() {
+		lock.lock();
+		try {
+			return queue.peek();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	public int size() {
+		lock.lock();
+		try {
+			return queue.size();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	public boolean isEmpty() {
+		lock.lock();
+		try {
+			return queue.isEmpty();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	public int remainingCapacity() {
+		lock.lock();
+		try {
+			return capacity - queue.size();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	public void wakeUpPuttingThread(int threadIndex) {
+		lock.lock();
+		try {
+			maybeCreateCondition(threadIndex);
+			ConditionAndFlag caf = putConditionAndFlags[threadIndex];
+			if (caf != null) {
+				caf.setWakeUp(true);
+				caf.condition().signal();
+			}
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	// --------------- private helpers -------------------------
+
+	private void enqueue(T element) {
+		int sizeBefore = queue.size();
+		queue.add(element);
 		futureNotifier.notifyComplete();
+		if (sizeBefore == 0) {
+			notEmpty.signal();
+		}
+		if (sizeBefore < capacity - 1 && !notFull.isEmpty()) {
+			signalNextPutter();
+		}
 	}
 
-	@Override
-	public boolean offer(T t, long timeout, TimeUnit unit) throws InterruptedException {
-		if (super.offer(t, timeout, unit)) {
-			futureNotifier.notifyComplete();
-			return true;
-		} else {
-			return false;
+	private T dequeue() {
+		int sizeBefore = queue.size();
+		T element = queue.poll();
+		if (sizeBefore == capacity && !notFull.isEmpty()) {
+			signalNextPutter();
 		}
+		if (sizeBefore > 1) {
+			notEmpty.signal();
+		}
+		return element;
 	}
 
-	@Override
-	public boolean offer(T t) {
-		if (super.offer(t)) {
-			futureNotifier.notifyComplete();
-			return true;
-		} else {
-			return false;
+	private void waitOnPut(int fetcherIndex) throws InterruptedException {
+		maybeCreateCondition(fetcherIndex);
+		Condition cond = putConditionAndFlags[fetcherIndex].condition();
+		notFull.add(cond);
+		cond.await();
+	}
+
+	private void signalNextPutter() {
+		if (!notFull.isEmpty()) {
+			notFull.poll().signal();
 		}
 	}
 
-	@Override
-	public boolean add(T t) {
-		if (super.add(t)) {
-			futureNotifier.notifyComplete();
-			return true;
-		} else {
-			return false;
+	private void maybeCreateCondition(int threadIndex) {
+		if (putConditionAndFlags.length < threadIndex + 1) {
+			putConditionAndFlags = Arrays.copyOf(putConditionAndFlags, threadIndex + 1);
+		}
+
+		if (putConditionAndFlags[threadIndex] == null) {
+			putConditionAndFlags[threadIndex] = new ConditionAndFlag(lock.newCondition());
 		}
 	}
 
-	@Override
-	public boolean addAll(Collection<? extends T> c) {
-		if (super.addAll(c)) {
-			futureNotifier.notifyComplete();
+	private boolean getAndResetWakeUpFlag(int threadIndex) {
+		maybeCreateCondition(threadIndex);
+		if (putConditionAndFlags[threadIndex].getWakeUp()) {
+			putConditionAndFlags[threadIndex].setWakeUp(false);
 			return true;
-		} else {
-			return false;
+		}
+		return false;
+	}
+
+	// --------------- private per thread state ------------
+
+	private static class ConditionAndFlag {
+		private final Condition cond;
+		private boolean wakeUp;
+
+		private ConditionAndFlag(Condition cond) {
+			this.cond = cond;
+			this.wakeUp = false;
+		}
+
+		private Condition condition() {
+			return cond;
+		}
+
+		private boolean getWakeUp() {
+			return wakeUp;
+		}
+
+		private void setWakeUp(boolean value) {
+			wakeUp = value;
 		}
 	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index e9c2ad2..eef8328 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.connector.base.source.reader.fetcher;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import org.junit.Test;
 
@@ -29,8 +31,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.SortedSet;
 import java.util.TreeSet;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -45,9 +45,11 @@ public class SplitFetcherTest {
 	private static final int NUM_RECORDS_PER_SPLIT = 10_000;
 	private static final int INTERRUPT_RECORDS_INTERVAL = 10;
 	private static final int NUM_TOTAL_RECORDS = NUM_RECORDS_PER_SPLIT * NUM_SPLITS;
+
 	@Test
 	public void testWakeup() throws InterruptedException {
-		BlockingQueue<RecordsWithSplitIds<int[]>> elementQueue = new ArrayBlockingQueue<>(1);
+		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue =
+			new FutureCompletingBlockingQueue<>(new FutureNotifier(), 1);
 		SplitFetcher<int[], MockSourceSplit> fetcher =
 				new SplitFetcher<>(
 						0,
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
index ede92eb..0d202f7 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/TestingSplitReader.java
@@ -46,11 +46,10 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR
 		if (!fetches.isEmpty()) {
 			return fetches.removeFirst();
 		} else {
-			// block until interrupted
+			// block until woken up
 			synchronized (fetches) {
-				while (true) {
-					fetches.wait();
-				}
+				fetches.wait();
+				return null;
 			}
 		}
 	}
@@ -61,5 +60,9 @@ public class TestingSplitReader<E, SplitT extends SourceSplit> implements SplitR
 	}
 
 	@Override
-	public void wakeUp() {}
+	public void wakeUp() {
+		synchronized (fetches) {
+			fetches.notifyAll();
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
index ad74f2a..c1bde50 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -20,16 +20,129 @@ package org.apache.flink.connector.base.source.reader.synchronization;
 
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * The unit test for {@link FutureCompletingBlockingQueue}.
  */
 public class FutureCompletingBlockingQueueTest {
+	private static final Integer DEFAULT_CAPACITY = 1;
+	private static final Integer SPECIFIED_CAPACITY = 20000;
+
+	@Test
+	public void testBasics() throws InterruptedException {
+		FutureNotifier futureNotifier = new FutureNotifier();
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5);
 
+		CompletableFuture<Void> future = futureNotifier.future();
+		assertTrue(queue.isEmpty());
+		assertEquals(0, queue.size());
 
-	private static final Integer DEFAULT_CAPACITY = 10000;
-	private static final Integer SPECIFIED_CAPACITY = 20000;
+		queue.put(0, 1234);
+
+		assertTrue(future.isDone());
+		assertEquals(1, queue.size());
+		assertFalse(queue.isEmpty());
+		assertEquals(4, queue.remainingCapacity());
+		assertNotNull(queue.peek());
+		assertEquals(1234, (int) queue.peek());
+		assertEquals(1234, (int) queue.poll());
+
+		assertEquals(0, queue.size());
+		assertTrue(queue.isEmpty());
+		assertEquals(5, queue.remainingCapacity());
+	}
+
+	@Test
+	public void testPoll() throws InterruptedException {
+		FutureNotifier futureNotifier = new FutureNotifier();
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier);
+		queue.put(0, 1234);
+		Integer value = queue.poll();
+		assertNotNull(value);
+		assertEquals(1234, (int) value);
+	}
+
+	@Test
+	public void testWakeUpPut() throws InterruptedException {
+		FutureNotifier futureNotifier = new FutureNotifier();
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 1);
+
+		CountDownLatch latch = new CountDownLatch(1);
+		new Thread(() -> {
+			try {
+				assertTrue(queue.put(0, 1234));
+				assertFalse(queue.put(0, 1234));
+				latch.countDown();
+			} catch (InterruptedException e) {
+				fail("Interrupted unexpectedly.");
+			}
+		}).start();
+
+		queue.wakeUpPuttingThread(0);
+		latch.await();
+		assertEquals(0, latch.getCount());
+	}
+
+	@Test
+	public void testConcurrency() throws InterruptedException {
+		FutureNotifier futureNotifier = new FutureNotifier();
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5);
+		final int numValuesPerThread = 10000;
+		final int numPuttingThreads = 5;
+		List<Thread> threads = new ArrayList<>();
+
+		for (int i = 0; i < numPuttingThreads; i++) {
+			final int index = i;
+			Thread t = new Thread(() -> {
+				for (int j = 0; j < numValuesPerThread; j++) {
+					int base = index * numValuesPerThread;
+					try {
+						queue.put(index, base + j);
+					} catch (InterruptedException e) {
+						fail("putting thread interrupted.");
+					}
+				}
+			});
+			t.start();
+			threads.add(t);
+		}
+
+		BitSet bitSet = new BitSet();
+		AtomicInteger count = new AtomicInteger(0);
+		for (int i = 0; i < 5; i++) {
+			Thread t = new Thread(() -> {
+				while (count.get() < numPuttingThreads * numValuesPerThread) {
+					Integer value = queue.poll();
+					if (value == null) {
+						continue;
+					}
+					count.incrementAndGet();
+					if (bitSet.get(value)) {
+						fail("Value " + value + " has been consumed before");
+					}
+					synchronized (bitSet) {
+						bitSet.set(value);
+					}
+				}});
+			t.start();
+			threads.add(t);
+		}
+		for (Thread t : threads) {
+			t.join();
+		}
+	}
 
 	@Test
 	public void testFutureCompletingBlockingQueueConstructor() {


[flink] 10/11: [FLINK-19245][connectors] Set default capacity for FutureCompletingBlockingQueue.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cef8a587d7fd2fe64cc644da5ed095d82e46f631
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 15 16:05:37 2020 +0200

    [FLINK-19245][connectors] Set default capacity for FutureCompletingBlockingQueue.
---
 .../apache/flink/connector/base/source/reader/SourceReaderOptions.java  | 2 +-
 .../source/reader/synchronization/FutureCompletingBlockingQueue.java    | 2 +-
 .../reader/synchronization/FutureCompletingBlockingQueueTest.java       | 2 +-
 3 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
index 508b347..dae1a40 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderOptions.java
@@ -38,7 +38,7 @@ public class SourceReaderOptions {
 		ConfigOptions
 				.key("source.reader.element.queue.capacity")
 				.intType()
-				.defaultValue(1)
+				.defaultValue(2)
 				.withDescription("The capacity of the element queue in the source reader.");
 
 	// --------------- final fields ----------------------
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index c89b682..1fe1985 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -80,7 +80,7 @@ public class FutureCompletingBlockingQueue<T> {
 	/**
 	 * The default capacity for the queue.
 	 */
-	private static final int DEFAULT_CAPACITY = 1;
+	private static final int DEFAULT_CAPACITY = 2;
 
 	// ------------------------------------------------------------------------
 
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
index ef056d9e..2a191d2 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -40,7 +40,7 @@ import static org.junit.Assert.fail;
  * The unit test for {@link FutureCompletingBlockingQueue}.
  */
 public class FutureCompletingBlockingQueueTest {
-	private static final Integer DEFAULT_CAPACITY = 1;
+	private static final Integer DEFAULT_CAPACITY = 2;
 	private static final Integer SPECIFIED_CAPACITY = 20000;
 
 	@Test


[flink] 09/11: [FLINK-19223][connectors] Simplify Availability Future Model in Base Connector

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4ea95782b4c6a2538153d4d16ad3f4839c7de0fb
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 23:48:01 2020 +0200

    [FLINK-19223][connectors] Simplify Availability Future Model in Base Connector
    
    This implements a model closer to the AvailabilityListener and AvailabilityHelper in the flink-runtime.
    
    This closes #13385
---
 .../SingleThreadMultiplexSourceReaderBase.java     |   5 +-
 .../base/source/reader/SourceReaderBase.java       |  21 +-
 .../reader/fetcher/SingleThreadFetcherManager.java |   4 +-
 .../source/reader/fetcher/SplitFetcherManager.java |   5 +-
 .../FutureCompletingBlockingQueue.java             | 259 +++++++++++++++++----
 .../reader/synchronization/FutureNotifier.java     |  66 ------
 .../base/source/reader/SourceReaderBaseTest.java   |  13 +-
 .../source/reader/fetcher/SplitFetcherTest.java    |  36 ++-
 .../base/source/reader/mocks/MockBaseSource.java   |   5 +-
 .../base/source/reader/mocks/MockSourceReader.java |   6 +-
 .../FutureCompletingBlockingQueueTest.java         |  32 ++-
 .../reader/synchronization/FutureNotifierTest.java | 131 -----------
 12 files changed, 261 insertions(+), 322 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
index 3239f28..ab87db0 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import java.util.function.Supplier;
 
@@ -40,16 +39,14 @@ public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends
 	extends SourceReaderBase<E, T, SplitT, SplitStateT> {
 
 	public SingleThreadMultiplexSourceReaderBase(
-		FutureNotifier futureNotifier,
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
 		Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
 		RecordEmitter<E, T, SplitStateT> recordEmitter,
 		Configuration config,
 		SourceReaderContext context) {
 		super(
-			futureNotifier,
 			elementsQueue,
-			new SingleThreadFetcherManager<>(futureNotifier, elementsQueue, splitReaderSupplier),
+			new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier),
 			recordEmitter,
 			config,
 			context);
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 0305e2d..fb4e6df9 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -29,7 +29,6 @@ import org.apache.flink.connector.base.source.event.NoMoreSplitsEvent;
 import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 import org.apache.flink.core.io.InputStatus;
 
 import org.slf4j.Logger;
@@ -61,9 +60,6 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 		implements SourceReader<T, SplitT> {
 	private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBase.class);
 
-	/** A future notifier to notify when this reader requires attention. */
-	private final FutureNotifier futureNotifier;
-
 	/** A queue to buffer the elements fetched by the fetcher thread. */
 	private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
 
@@ -94,13 +90,11 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	private boolean noMoreSplitsAssignment;
 
 	public SourceReaderBase(
-			FutureNotifier futureNotifier,
 			FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
 			SplitFetcherManager<E, SplitT> splitFetcherManager,
 			RecordEmitter<E, T, SplitStateT> recordEmitter,
 			Configuration config,
 			SourceReaderContext context) {
-		this.futureNotifier = futureNotifier;
 		this.elementsQueue = elementsQueue;
 		this.splitFetcherManager = splitFetcherManager;
 		this.recordEmitter = recordEmitter;
@@ -203,18 +197,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 
 	@Override
 	public CompletableFuture<Void> isAvailable() {
-		// The order matters here. We first get the future. After this point, if the queue
-		// is empty or there is no error in the split fetcher manager, we can ensure that
-		// the future will be completed by the fetcher once it put an element into the element queue,
-		// or it will be completed when an error occurs.
-		CompletableFuture<Void> future = futureNotifier.future();
-		splitFetcherManager.checkErrors();
-		if (!elementsQueue.isEmpty()) {
-			// The fetcher got the new elements after the last poll, or their is a finished split.
-			// Simply complete the future and return;
-			futureNotifier.notifyComplete();
-		}
-		return future;
+		return currentFetch != null ? FutureCompletingBlockingQueue.AVAILABLE : elementsQueue.getAvailabilityFuture();
 	}
 
 	@Override
@@ -239,7 +222,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 		if (sourceEvent instanceof NoMoreSplitsEvent) {
 			LOG.info("Reader received NoMoreSplits event.");
 			noMoreSplitsAssignment = true;
-			futureNotifier.notifyComplete();
+			elementsQueue.notifyAvailable();
 		}
 	}
 
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
index bd5879f..339c533 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SingleThreadFetcherManager.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceSplit;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import java.util.List;
 import java.util.function.Supplier;
@@ -34,10 +33,9 @@ public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
 		extends SplitFetcherManager<E, SplitT> {
 
 	public SingleThreadFetcherManager(
-			FutureNotifier futureNotifier,
 			FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
 			Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
-		super(futureNotifier, elementsQueue, splitReaderSupplier);
+		super(elementsQueue, splitReaderSupplier);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 26d92e3..ffac523 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -23,7 +23,6 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderBase;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 import org.apache.flink.util.ThrowableCatchingRunnable;
 
 import org.slf4j.Logger;
@@ -79,12 +78,10 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
 	/**
 	 * Create a split fetcher manager.
 	 *
-	 * @param futureNotifier a notifier to notify the complete of a future.
 	 * @param elementsQueue the queue that split readers will put elements into.
 	 * @param splitReaderFactory a supplier that could be used to create split readers.
 	 */
 	public SplitFetcherManager(
-			FutureNotifier futureNotifier,
 			FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
 			Supplier<SplitReader<E, SplitT>> splitReaderFactory) {
 		this.elementsQueue = elementsQueue;
@@ -96,7 +93,7 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
 					// Add the exception to the exception list.
 					uncaughtFetcherException.get().addSuppressed(t);
 					// Wake up the main thread to let it know the exception.
-					futureNotifier.notifyComplete();
+					elementsQueue.notifyAvailable();
 				}
 			}
 		};
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
index dcbb66e..c89b682 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueue.java
@@ -18,62 +18,174 @@
 
 package org.apache.flink.connector.base.source.reader.synchronization;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.lang.reflect.Field;
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
- * A custom implementation of blocking queue with the following features.
- * <ul>
- *     <li>
- *         It allows a consuming thread to be notified asynchronously on element availability when the
- *         queue is empty.
- *     </li>
- *     <li>
- *         Allows the putting threads to be gracefully waken up without interruption.
- *     </li>
- * </ul>
+ * A custom implementation of blocking queue in combination with a {@link CompletableFuture} that is
+ * used in the hand-over of data from a producing thread to a consuming thread.
+ * This FutureCompletingBlockingQueue has the following features:
+ *
+ * <h3>Consumer Notifications</h3>
+ *
+ * <p>Rather than letting consumers block on the {@link #take()} method, or have them poll the
+ * {@link #poll()} method, this queue offers a {@link CompletableFuture}, obtained via the
+ * {@link #getAvailabilityFuture()} method) that gets completed whenever the queue is non-empty.
+ * A consumer can thus subscribe to asynchronous notifications for availability by adding a handler
+ * to the obtained {@code CompletableFuture}.
+ *
+ * <p>The future may also be completed by an explicit call to {@link #notifyAvailable()}. That way the
+ * consumer may be notified of a situation/condition without adding an element to the queue.
+ *
+ * <p>Availability is reset when a call to {@link #poll()} (or {@link #take()} finds an empty queue
+ * or results in an empty queue (takes the last element).
+ *
+ * <p>Note that this model generally assumes that <i>false positives</i> are okay, meaning that the
+ * availability future completes despite there being no data availabile in the queue. The consumer is
+ * responsible for polling data and obtaining another future to wait on. This is similar to the way
+ * that Java's Monitors and Conditions can have the <i>spurious wakeup</i> of the waiting threads
+ * and commonly need to be used in loop with the waiting condition.
+ *
+ * <h3>Producer Wakeup</h3>
+ *
+ * <p>The queue supports gracefully waking up producing threads that are blocked due to the queue
+ * capacity limits, without interrupting the thread. This is done via the {@link #wakeUpPuttingThread(int)}
+ * method.
  *
  * @param <T> the type of the elements in the queue.
  */
 public class FutureCompletingBlockingQueue<T> {
+
+	/**
+	 * A constant future that is complete, indicating availability. Using this constant in cases that
+	 * are guaranteed available helps short-circuiting some checks and avoiding volatile memory operations.
+	 */
+	public static final CompletableFuture<Void> AVAILABLE = getAvailableFuture();
+
+	/**
+	 * The default capacity for the queue.
+	 */
+	private static final int DEFAULT_CAPACITY = 1;
+
+	// ------------------------------------------------------------------------
+
+	/** The maximum capacity of the queue. */
 	private final int capacity;
-	private final FutureNotifier futureNotifier;
 
-	/** The element queue. */
-	private final Queue<T> queue;
+	/** The availability future. This doubles as a "non empty" condition. This value is never null.*/
+	private CompletableFuture<Void> currentFuture;
+
 	/** The lock for synchronization. */
 	private final Lock lock;
+
+	/** The element queue. */
+	@GuardedBy("lock")
+	private final Queue<T> queue;
+
 	/** The per-thread conditions that are waiting on putting elements. */
+	@GuardedBy("lock")
 	private final Queue<Condition> notFull;
-	/** The shared conditions for getting elements. */
-	private final Condition notEmpty;
+
 	/** The per-thread conditions and wakeUp flags. */
+	@GuardedBy("lock")
 	private ConditionAndFlag[] putConditionAndFlags;
 
-	/**
-	 * The default capacity for the queue.
-	 */
-	private static final Integer DEFAULT_CAPACITY = 1;
-
-	public FutureCompletingBlockingQueue(FutureNotifier futureNotifier) {
-		this(futureNotifier, DEFAULT_CAPACITY);
+	public FutureCompletingBlockingQueue() {
+		this(DEFAULT_CAPACITY);
 	}
 
-	public FutureCompletingBlockingQueue(FutureNotifier futureNotifier, int capacity) {
+	public FutureCompletingBlockingQueue(int capacity) {
+		checkArgument(capacity > 0, "capacity must be > 0");
 		this.capacity = capacity;
-		this.futureNotifier = futureNotifier;
 		this.queue = new ArrayDeque<>(capacity);
 		this.lock = new ReentrantLock();
 		this.putConditionAndFlags = new ConditionAndFlag[1];
 		this.notFull = new ArrayDeque<>();
-		this.notEmpty = lock.newCondition();
+
+		// initially the queue is empty and thus unavailable
+		this.currentFuture = new CompletableFuture<>();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Future / Notification logic
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns the availability future. If the queue is non-empty, then this future will already
+	 * be complete. Otherwise the obtained future is guaranteed to get completed the next time
+	 * the queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}.
+	 *
+	 * <p>It is important that a completed future is no guarantee that the next call to
+	 * {@link #poll()} will return a non-null element. If there are concurrent consumer, another
+	 * consumer may have taken the available element. Or there was no element in the first place,
+	 * because the future was completed through a call to {@link #notifyAvailable()}.
+	 *
+	 * <p>For that reason, it is important to call this method (to obtain a new future) every
+	 * time again after {@link #poll()} returned null and you want to wait for data.
+	 */
+	public CompletableFuture<Void> getAvailabilityFuture() {
+		return currentFuture;
+	}
+
+	/**
+	 * Makes sure the availability future is complete, if it is not complete already.
+	 * All futures returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to
+	 * be completed.
+	 *
+	 * <p>All future calls to the method will return a completed future, until the point
+	 * that the availability is reset via calls to {@link #poll()} that leave the queue empty.
+	 */
+	public void notifyAvailable() {
+		lock.lock();
+		try {
+			moveToAvailable();
+		} finally {
+			lock.unlock();
+		}
+	}
+
+	/**
+	 * Internal utility to make sure that the current future futures are complete (until reset).
+	 */
+	@GuardedBy("lock")
+	private void moveToAvailable() {
+		final CompletableFuture<Void> current = currentFuture;
+		if (current != AVAILABLE) {
+			currentFuture = AVAILABLE;
+			current.complete(null);
+		}
 	}
 
 	/**
+	 * Makes sure the availability future is incomplete, if it was complete before.
+	 */
+	@GuardedBy("lock")
+	private void moveToUnAvailable() {
+		if (currentFuture == AVAILABLE) {
+			currentFuture = new CompletableFuture<>();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Blocking Queue Logic
+	// ------------------------------------------------------------------------
+
+	/**
 	 * Put an element into the queue. The thread blocks if the queue is full.
 	 *
 	 * @param threadIndex the index of the thread.
@@ -101,25 +213,40 @@ public class FutureCompletingBlockingQueue<T> {
 	}
 
 	/**
-	 * Get and remove the first element from the queue. The call blocks if the queue is empty.
+	 * <b>Warning:</b> This is a dangerous method and should only be used for testing convenience.
+	 * A method that blocks until availability does not go together well with the concept of
+	 * asynchronous notifications and non-blocking polling.
+	 *
+	 * <p>Get and remove the first element from the queue. The call blocks if the queue is empty.
+	 * The problem with this method is that it may loop internally until an element is available and
+	 * that way eagerly reset the availability future. If a consumer thread is blocked in taking an
+	 * element, it will receive availability notifications from {@link #notifyAvailable()} and immediately
+	 * reset them by calling {@link #poll()} and finding the queue empty.
 	 *
 	 * @return the first element in the queue.
 	 * @throws InterruptedException when the thread is interrupted.
 	 */
-	public T take() throws InterruptedException{
-		lock.lock();
-		try {
-			while (queue.size() == 0) {
-				notEmpty.await();
+	@VisibleForTesting
+	public T take() throws InterruptedException {
+		T next;
+		while ((next = poll()) == null) {
+			// use the future to wait for availability to avoid busy waiting
+			try {
+				getAvailabilityFuture().get();
+			} catch (ExecutionException | CompletionException e) {
+				// this should never happen, but we propagate just in case
+				throw new FlinkRuntimeException("exception in queue future completion", e);
 			}
-			return dequeue();
-		} finally {
-			lock.unlock();
 		}
+		return next;
 	}
 
 	/**
-	 * Get and remove the first element from the queue. Null is retuned if the queue is empty.
+	 * Get and remove the first element from the queue. Null is returned if the queue is empty.
+	 * If this makes the queue empty (takes the last element) or finds the queue already empty,
+	 * then this resets the availability notifications. The next call to {@link #getAvailabilityFuture()}
+	 * will then return a non-complete future that completes only the next time that the queue
+	 * becomes non-empty or the {@link #notifyAvailable()} method is called.
 	 *
 	 * @return the first element from the queue, or Null if the queue is empty.
 	 */
@@ -127,6 +254,7 @@ public class FutureCompletingBlockingQueue<T> {
 		lock.lock();
 		try {
 			if (queue.size() == 0) {
+				moveToUnAvailable();
 				return null;
 			}
 			return dequeue();
@@ -149,6 +277,9 @@ public class FutureCompletingBlockingQueue<T> {
 		}
 	}
 
+	/**
+	 * Gets the size of the queue.
+	 */
 	public int size() {
 		lock.lock();
 		try {
@@ -158,6 +289,9 @@ public class FutureCompletingBlockingQueue<T> {
 		}
 	}
 
+	/**
+	 * Checks whether the queue is empty.
+	 */
 	public boolean isEmpty() {
 		lock.lock();
 		try {
@@ -167,6 +301,10 @@ public class FutureCompletingBlockingQueue<T> {
 		}
 	}
 
+	/**
+	 * Checks the remaining capacity in the queue. That is the difference between the maximum capacity
+	 * and the current number of elements in the queue.
+	 */
 	public int remainingCapacity() {
 		lock.lock();
 		try {
@@ -176,6 +314,16 @@ public class FutureCompletingBlockingQueue<T> {
 		}
 	}
 
+	/**
+	 * Gracefully wakes up the thread with the given {@code threadIndex} if it is blocked in
+	 * adding an element. to the queue. If the thread is blocked in {@link #put(int, Object)} it will
+	 * immediately return from the method with a return value of false.
+	 *
+	 * <p>If this method is called, the next time the thread with the given index is about to be blocked
+	 * in adding an element, it may immediately wake up and return.
+	 *
+	 * @param threadIndex The number identifying the thread.
+	 */
 	public void wakeUpPuttingThread(int threadIndex) {
 		lock.lock();
 		try {
@@ -190,36 +338,34 @@ public class FutureCompletingBlockingQueue<T> {
 		}
 	}
 
-	public void notifyAvailable() {
-		futureNotifier.notifyComplete();
-	}
-
 	// --------------- private helpers -------------------------
 
+	@GuardedBy("lock")
 	private void enqueue(T element) {
-		int sizeBefore = queue.size();
+		final int sizeBefore = queue.size();
 		queue.add(element);
-		futureNotifier.notifyComplete();
 		if (sizeBefore == 0) {
-			notEmpty.signal();
+			moveToAvailable();
 		}
 		if (sizeBefore < capacity - 1 && !notFull.isEmpty()) {
 			signalNextPutter();
 		}
 	}
 
+	@GuardedBy("lock")
 	private T dequeue() {
-		int sizeBefore = queue.size();
-		T element = queue.poll();
+		final int sizeBefore = queue.size();
+		final T element = queue.poll();
 		if (sizeBefore == capacity && !notFull.isEmpty()) {
 			signalNextPutter();
 		}
-		if (sizeBefore > 1) {
-			notEmpty.signal();
+		if (queue.isEmpty()) {
+			moveToUnAvailable();
 		}
 		return element;
 	}
 
+	@GuardedBy("lock")
 	private void waitOnPut(int fetcherIndex) throws InterruptedException {
 		maybeCreateCondition(fetcherIndex);
 		Condition cond = putConditionAndFlags[fetcherIndex].condition();
@@ -227,12 +373,14 @@ public class FutureCompletingBlockingQueue<T> {
 		cond.await();
 	}
 
+	@GuardedBy("lock")
 	private void signalNextPutter() {
 		if (!notFull.isEmpty()) {
 			notFull.poll().signal();
 		}
 	}
 
+	@GuardedBy("lock")
 	private void maybeCreateCondition(int threadIndex) {
 		if (putConditionAndFlags.length < threadIndex + 1) {
 			putConditionAndFlags = Arrays.copyOf(putConditionAndFlags, threadIndex + 1);
@@ -243,6 +391,7 @@ public class FutureCompletingBlockingQueue<T> {
 		}
 	}
 
+	@GuardedBy("lock")
 	private boolean getAndResetWakeUpFlag(int threadIndex) {
 		maybeCreateCondition(threadIndex);
 		if (putConditionAndFlags[threadIndex].getWakeUp()) {
@@ -275,4 +424,22 @@ public class FutureCompletingBlockingQueue<T> {
 			wakeUp = value;
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	private static CompletableFuture<Void> getAvailableFuture() {
+		// this is a way to obtain the AvailabilityProvider.AVAILABLE future until we decide to
+		// move the class from the runtime module to the core module
+		try {
+			final Class<?> clazz = Class.forName("org.apache.flink.runtime.io.AvailabilityProvider");
+			final Field field = clazz.getDeclaredField("AVAILABLE");
+			return (CompletableFuture<Void>) field.get(null);
+		}
+		catch (Throwable t) {
+			return CompletableFuture.completedFuture(null);
+		}
+	}
 }
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java
deleted file mode 100644
index 9330407..0000000
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifier.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.source.reader.synchronization;
-
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * A class facilitating the asynchronous communication among threads.
- */
-public class FutureNotifier {
-	/** A future reference. */
-	private final AtomicReference<CompletableFuture<Void>> futureRef;
-
-	public FutureNotifier() {
-		this.futureRef = new AtomicReference<>(null);
-	}
-
-	/**
-	 * Get the future out of this notifier. The future will be completed when someone invokes
-	 * {@link #notifyComplete()}. If there is already an uncompleted future, that existing
-	 * future will be returned instead of a new one.
-	 *
-	 * @return a future that will be completed when {@link #notifyComplete()} is invoked.
-	 */
-	public CompletableFuture<Void> future() {
-		CompletableFuture<Void> prevFuture = futureRef.get();
-		if (prevFuture != null) {
-			// Someone has created a future for us, don't create a new one.
-			return prevFuture;
-		} else {
-			CompletableFuture<Void> newFuture = new CompletableFuture<>();
-			boolean newFutureSet = futureRef.compareAndSet(null, newFuture);
-			// If someone created a future after our previous check, use that future.
-			// Otherwise, use the new future.
-			return newFutureSet ? newFuture : future();
-		}
-	}
-
-	/**
-	 * Complete the future if there is one. This will release the thread that is waiting for data.
-	 */
-	public void notifyComplete() {
-		CompletableFuture<Void> future = futureRef.get();
-		// If there are multiple threads trying to complete the future, only the first one succeeds.
-		if (future != null && future.complete(null)) {
-			futureRef.compareAndSet(future, null);
-		}
-	}
-}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 0ec4297..84eeb4e 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -33,7 +33,6 @@ import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import org.junit.Rule;
 import org.junit.Test;
@@ -62,12 +61,10 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 		expectedException.expectMessage("One or more fetchers have encountered exception");
 		final String errMsg = "Testing Exception";
 
-		FutureNotifier futureNotifier = new FutureNotifier();
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
-			new FutureCompletingBlockingQueue<>(futureNotifier);
+			new FutureCompletingBlockingQueue<>();
 		// We have to handle split changes first, otherwise fetch will not be called.
 		try (MockSourceReader reader = new MockSourceReader(
-			futureNotifier,
 			elementsQueue,
 			() -> new SplitReader<int[], MockSourceSplit>() {
 				@Override
@@ -127,13 +124,11 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 
 	@Override
 	protected MockSourceReader createReader() {
-		FutureNotifier futureNotifier = new FutureNotifier();
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
-			new FutureCompletingBlockingQueue<>(futureNotifier);
+			new FutureCompletingBlockingQueue<>();
 		MockSplitReader mockSplitReader =
 			new MockSplitReader(2, true, true);
 		return new MockSourceReader(
-			futureNotifier,
 			elementsQueue,
 			() -> mockSplitReader,
 			getConfig(),
@@ -183,12 +178,10 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit>
 		final String splitId,
 		final RecordsWithSplitIds<E> records) throws Exception {
 
-		final FutureNotifier futureNotifier = new FutureNotifier();
 		final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue =
-			new FutureCompletingBlockingQueue<>(futureNotifier);
+			new FutureCompletingBlockingQueue<>();
 
 		final SourceReader<E, TestingSourceSplit> reader = new SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, TestingSourceSplit>(
-			futureNotifier,
 			elementsQueue,
 			() -> new TestingSplitReader<E, TestingSourceSplit>(records),
 			new PassThroughRecordEmitter<E, TestingSourceSplit>(),
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
index 6e27d95..c25490b 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
 import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 import org.apache.flink.core.testutils.CheckedThread;
 
 import org.junit.Test;
@@ -84,28 +83,28 @@ public class SplitFetcherTest {
 
 	@Test
 	public void testNotifiesWhenGoingIdle() {
-		final FutureNotifier notifier = new FutureNotifier();
+		final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = new FutureCompletingBlockingQueue<>();
 		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
 			"test-split",
-			new FutureCompletingBlockingQueue<>(notifier),
+			queue,
 			new TestingSplitReader<>(finishedSplitFetch("test-split")));
 
 		fetcher.runOnce();
 
 		assertTrue(fetcher.assignedSplits().isEmpty());
 		assertTrue(fetcher.isIdle());
-		assertTrue(notifier.future().isDone());
+		assertTrue(queue.getAvailabilityFuture().isDone());
 	}
 
 	@Test
 	public void testNotifiesOlderFutureWhenGoingIdle() {
-		final FutureNotifier notifier = new FutureNotifier();
+		final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue = new FutureCompletingBlockingQueue<>();
 		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
-			"test-split",
-			new FutureCompletingBlockingQueue<>(notifier),
-			new TestingSplitReader<>(finishedSplitFetch("test-split")));
+				"test-split",
+				queue,
+				new TestingSplitReader<>(finishedSplitFetch("test-split")));
 
-		final CompletableFuture<?> future = notifier.future();
+		final CompletableFuture<?> future = queue.getAvailabilityFuture();
 
 		fetcher.runOnce();
 
@@ -116,9 +115,8 @@ public class SplitFetcherTest {
 
 	@Test
 	public void testNotifiesWhenGoingIdleConcurrent() throws Exception {
-		final FutureNotifier notifier = new FutureNotifier();
 		final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
-				new FutureCompletingBlockingQueue<>(notifier);
+				new FutureCompletingBlockingQueue<>();
 		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
 			"test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
 
@@ -128,7 +126,7 @@ public class SplitFetcherTest {
 		try {
 			fetcher.runOnce();
 
-			assertTrue(notifier.future().isDone());
+			assertTrue(queue.getAvailabilityFuture().isDone());
 		} finally {
 			queueDrainer.shutdown();
 		}
@@ -136,16 +134,15 @@ public class SplitFetcherTest {
 
 	@Test
 	public void testNotifiesOlderFutureWhenGoingIdleConcurrent() throws Exception {
-		final FutureNotifier notifier = new FutureNotifier();
 		final FutureCompletingBlockingQueue<RecordsWithSplitIds<Object>> queue =
-			new FutureCompletingBlockingQueue<>(notifier);
+				new FutureCompletingBlockingQueue<>();
 		final SplitFetcher<Object, TestingSourceSplit> fetcher = createFetcherWithSplit(
-			"test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
+				"test-split", queue, new TestingSplitReader<>(finishedSplitFetch("test-split")));
 
 		final QueueDrainerThread queueDrainer = new QueueDrainerThread(queue);
 		queueDrainer.start();
 
-		final CompletableFuture<?> future = notifier.future();
+		final CompletableFuture<?> future = queue.getAvailabilityFuture();
 
 		try {
 			fetcher.runOnce();
@@ -164,7 +161,7 @@ public class SplitFetcherTest {
 		final int numTotalRecords = numRecordsPerSplit * numSplits;
 
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementQueue =
-			new FutureCompletingBlockingQueue<>(new FutureNotifier(), 1);
+			new FutureCompletingBlockingQueue<>(1);
 		SplitFetcher<int[], MockSourceSplit> fetcher =
 				new SplitFetcher<>(
 						0,
@@ -243,7 +240,7 @@ public class SplitFetcherTest {
 
 	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
 			final SplitReader<E, TestingSourceSplit> reader) {
-		return createFetcher(reader, new FutureCompletingBlockingQueue<>(new FutureNotifier()));
+		return createFetcher(reader, new FutureCompletingBlockingQueue<>());
 	}
 
 	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcher(
@@ -255,7 +252,7 @@ public class SplitFetcherTest {
 	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(
 			final String splitId,
 			final SplitReader<E, TestingSourceSplit> reader) {
-		return createFetcherWithSplit(splitId, new FutureCompletingBlockingQueue<>(new FutureNotifier()), reader);
+		return createFetcherWithSplit(splitId, new FutureCompletingBlockingQueue<>(), reader);
 	}
 
 	private static <E> SplitFetcher<E, TestingSourceSplit> createFetcherWithSplit(
@@ -292,6 +289,7 @@ public class SplitFetcherTest {
 					queue.take();
 				}
 				catch (InterruptedException ignored) {
+					Thread.currentThread().interrupt();
 					// fall through the loop
 				}
 			}
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
index ae46286..2681e5a 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockBaseSource.java
@@ -30,7 +30,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -68,15 +67,13 @@ public class MockBaseSource implements Source<Integer, MockSourceSplit, List<Moc
 
 	@Override
 	public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) {
-		FutureNotifier futureNotifier = new FutureNotifier();
 		FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue =
-				new FutureCompletingBlockingQueue<>(futureNotifier);
+				new FutureCompletingBlockingQueue<>();
 
 		Configuration config = new Configuration();
 		config.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
 		config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L);
 		return new MockSourceReader(
-				futureNotifier,
 				elementsQueue,
 				() -> new MockSplitReader(2, true, true),
 				config,
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
index 92a19ef..66022db 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSourceReader.java
@@ -25,7 +25,6 @@ import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
 import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
-import org.apache.flink.connector.base.source.reader.synchronization.FutureNotifier;
 
 import java.util.Collection;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -37,12 +36,11 @@ import java.util.function.Supplier;
 public class MockSourceReader
 		extends SingleThreadMultiplexSourceReaderBase<int[], Integer, MockSourceSplit, AtomicInteger> {
 
-	public MockSourceReader(FutureNotifier futureNotifier,
-							FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue,
+	public MockSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue,
 							Supplier<SplitReader<int[], MockSourceSplit>> splitFetcherSupplier,
 							Configuration config,
 							SourceReaderContext context) {
-		super(futureNotifier, elementsQueue, splitFetcherSupplier, new MockRecordEmitter(), config, context);
+		super(elementsQueue, splitFetcherSupplier, new MockRecordEmitter(), config, context);
 	}
 
 	@Override
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
index c1bde50..ef056d9e 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.connector.base.source.reader.synchronization;
 
+import org.apache.flink.runtime.io.AvailabilityProvider;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -30,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -42,10 +45,9 @@ public class FutureCompletingBlockingQueueTest {
 
 	@Test
 	public void testBasics() throws InterruptedException {
-		FutureNotifier futureNotifier = new FutureNotifier();
-		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5);
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(5);
 
-		CompletableFuture<Void> future = futureNotifier.future();
+		CompletableFuture<Void> future = queue.getAvailabilityFuture();
 		assertTrue(queue.isEmpty());
 		assertEquals(0, queue.size());
 
@@ -66,8 +68,7 @@ public class FutureCompletingBlockingQueueTest {
 
 	@Test
 	public void testPoll() throws InterruptedException {
-		FutureNotifier futureNotifier = new FutureNotifier();
-		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier);
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>();
 		queue.put(0, 1234);
 		Integer value = queue.poll();
 		assertNotNull(value);
@@ -76,8 +77,7 @@ public class FutureCompletingBlockingQueueTest {
 
 	@Test
 	public void testWakeUpPut() throws InterruptedException {
-		FutureNotifier futureNotifier = new FutureNotifier();
-		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 1);
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(1);
 
 		CountDownLatch latch = new CountDownLatch(1);
 		new Thread(() -> {
@@ -97,8 +97,7 @@ public class FutureCompletingBlockingQueueTest {
 
 	@Test
 	public void testConcurrency() throws InterruptedException {
-		FutureNotifier futureNotifier = new FutureNotifier();
-		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(futureNotifier, 5);
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(5);
 		final int numValuesPerThread = 10000;
 		final int numPuttingThreads = 5;
 		List<Thread> threads = new ArrayList<>();
@@ -146,12 +145,21 @@ public class FutureCompletingBlockingQueueTest {
 
 	@Test
 	public void testFutureCompletingBlockingQueueConstructor() {
-		FutureNotifier notifier = new FutureNotifier();
-		FutureCompletingBlockingQueue<Object> defaultCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(notifier);
-		FutureCompletingBlockingQueue<Object> specifiedCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(notifier, SPECIFIED_CAPACITY);
+		FutureCompletingBlockingQueue<Object> defaultCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>();
+		FutureCompletingBlockingQueue<Object> specifiedCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(SPECIFIED_CAPACITY);
 		// The capacity of the queue needs to be equal to 10000
 		assertEquals(defaultCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) DEFAULT_CAPACITY);
 		// The capacity of the queue needs to be equal to SPECIFIED_CAPACITY
 		assertEquals(specifiedCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) SPECIFIED_CAPACITY);
 	}
+
+	/**
+	 * This test is to guard that our reflection is not broken and we do not lose the
+	 * performance advantage. This is possible, because the tests depend on the runtime modules
+	 * while the main scope does not.
+	 */
+	@Test
+	public void testQueueUsesShortCircuitFuture() {
+		assertSame(AvailabilityProvider.AVAILABLE, FutureCompletingBlockingQueue.AVAILABLE);
+	}
 }
diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java
deleted file mode 100644
index b257ebf..0000000
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureNotifierTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.source.reader.synchronization;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * The unit tests for {@link FutureNotifier}.
- */
-public class FutureNotifierTest {
-
-	@Test
-	public void testGetFuture() {
-		FutureNotifier notifier = new FutureNotifier();
-		CompletableFuture<Void> future = notifier.future();
-		// The future should not be null.
-		assertNotNull(future);
-		// Calling the future again should return the same future.
-		assertEquals(future, notifier.future());
-	}
-
-	@Test
-	public void testCompleteFuture() {
-		FutureNotifier notifier = new FutureNotifier();
-		CompletableFuture<Void> future = notifier.future();
-		assertFalse(future.isDone());
-		notifier.notifyComplete();
-		assertTrue(future.isDone());
-	}
-
-	@Test
-	public void testConcurrency() throws InterruptedException, ExecutionException {
-		final int times = 1_000_000;
-		final int nThreads = 5;
-		FutureNotifier notifier = new FutureNotifier();
-		// A thread pool that simply gets futures out of the notifier.
-		ExecutorService listenerExecutor = Executors.newFixedThreadPool(nThreads);
-		// A thread pool that completes the futures.
-		ExecutorService notifierExecutor = Executors.newFixedThreadPool(nThreads);
-
-		CountDownLatch runningListeners = new CountDownLatch(nThreads);
-		CountDownLatch startCommand = new CountDownLatch(1);
-		CountDownLatch finishLine = new CountDownLatch(1);
-
-		List<Future<?>> executionFutures = new ArrayList<>();
-		// Start nThreads thread getting futures out of the notifier.
-		for (int i = 0; i < nThreads; i++) {
-			executionFutures.add(listenerExecutor.submit(() -> {
-				try {
-					List<CompletableFuture<Void>> futures = new ArrayList<>(times);
-					startCommand.await();
-					for (int j = 0; j < times; j++) {
-						futures.add(notifier.future());
-					}
-					runningListeners.countDown();
-					// Wait for the notifying thread to finish.
-					finishLine.await();
-					// All the futures should have been completed.
-					futures.forEach(f -> {
-						assertNotNull(f);
-						assertTrue(f.isDone());
-					});
-				} catch (Exception e) {
-					fail();
-				}
-			}));
-		}
-
-		// Start nThreads thread notifying the completion.
-		for (int i = 0; i < nThreads; i++) {
-			notifierExecutor.submit(() -> {
-				try {
-					startCommand.await();
-					while (runningListeners.getCount() > 0) {
-						notifier.notifyComplete();
-					}
-					notifier.notifyComplete();
-					finishLine.countDown();
-				} catch (Exception e) {
-					fail();
-				}
-			});
-		}
-
-		// Kick off the threads.
-		startCommand.countDown();
-
-		try {
-			for (Future<?> executionFuture : executionFutures) {
-				executionFuture.get();
-			}
-		} finally {
-			listenerExecutor.shutdown();
-			notifierExecutor.shutdown();
-			listenerExecutor.awaitTermination(30L, TimeUnit.SECONDS);
-			notifierExecutor.awaitTermination(30L, TimeUnit.SECONDS);
-		}
-	}
-}


[flink] 05/11: [FLINK-19225][connectors] Various small improvements to SourceReaderBase

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d7625760a75a508bf05bcddc380bb4d62ee1743e
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Sep 14 12:26:47 2020 +0200

    [FLINK-19225][connectors] Various small improvements to SourceReaderBase
    
     - A slight improvement of main loop, to avoid a branch and improve inlining friendlyness.
    
     - Check for fetcher errors when moving between fetched.
       This is still guaranteed to propagate the errors, just some micro- or milliseconds later.
       In return we save one or two volatile accesses on the hot per-record path.
    
     - Extend logging in SourceReaderBase to simplify the debugging in the case something goes wrong.
    
     - Add Task name to Split Fetcher Threads to make it easier to attribute the threads when
       analyzing stack traces.
---
 .../base/source/reader/SourceReaderBase.java       | 31 ++++++++++++++--------
 .../source/reader/fetcher/SplitFetcherManager.java |  3 ++-
 2 files changed, 22 insertions(+), 12 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 979afb2..0305e2d 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -116,12 +116,13 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 
 	@Override
 	public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
-		splitFetcherManager.checkErrors();
-
 		// make sure we have a fetch we are working on, or move to the next
-		final RecordsWithSplitIds<E> recordsWithSplitId = getCurrentOrNewFetch(output);
+		RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
 		if (recordsWithSplitId == null) {
-			return trace(finishedOrAvailableLater());
+			recordsWithSplitId = getNextFetch(output);
+			if (recordsWithSplitId == null) {
+				return trace(finishedOrAvailableLater());
+			}
 		}
 
 		// we need to loop here, because we may have to go across splits
@@ -132,6 +133,12 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 				// emit the record.
 				recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
 				LOG.trace("Emitted record: {}", record);
+
+				// We always emit MORE_AVAILABLE here, even though we do not strictly know whether
+				// more is available. If nothing more is available, the next invocation will find
+				// this out and return the correct status.
+				// That means we emit the occasional 'false positive' for availability, but this
+				// saves us doing checks for every record. Ultimately, this is cheaper.
 				return trace(InputStatus.MORE_AVAILABLE);
 			}
 			else if (!moveToNextSplit(recordsWithSplitId, output)) {
@@ -147,13 +154,11 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	}
 
 	@Nullable
-	private RecordsWithSplitIds<E> getCurrentOrNewFetch(final ReaderOutput<T> output) {
-		RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
-		if (recordsWithSplitId != null) {
-			return recordsWithSplitId;
-		}
+	private RecordsWithSplitIds<E> getNextFetch(final ReaderOutput<T> output) {
+		splitFetcherManager.checkErrors();
 
-		recordsWithSplitId = elementsQueue.poll();
+		LOG.trace("Getting next source data batch from queue");
+		final RecordsWithSplitIds<E> recordsWithSplitId = elementsQueue.poll();
 		if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {
 			// No element available, set to available later if needed.
 			return null;
@@ -170,6 +175,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 
 		final Set<String> finishedSplits = fetch.finishedSplits();
 		if (!finishedSplits.isEmpty()) {
+			LOG.info("Finished reading split(s) {}", finishedSplits);
 			for (String finishedSplitId : finishedSplits) {
 				splitStates.remove(finishedSplitId);
 				output.releaseOutputForSplit(finishedSplitId);
@@ -183,6 +189,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	private boolean moveToNextSplit(RecordsWithSplitIds<E> recordsWithSplitIds, ReaderOutput<T> output) {
 		final String nextSplitId = recordsWithSplitIds.nextSplit();
 		if (nextSplitId == null) {
+			LOG.trace("Current fetch is finished.");
 			finishCurrentFetch(recordsWithSplitIds, output);
 			return false;
 		}
@@ -190,6 +197,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 		currentSplitContext = splitStates.get(nextSplitId);
 		checkState(currentSplitContext != null, "Have records for a split that was not registered");
 		currentSplitOutput = currentSplitContext.getOrCreateSplitOutput(output);
+		LOG.trace("Emitting records from fetch for split {}", nextSplitId);
 		return true;
 	}
 
@@ -218,7 +226,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 
 	@Override
 	public void addSplits(List<SplitT> splits) {
-		LOG.trace("Adding splits {}", splits);
+		LOG.info("Adding split(s) to reader: {}", splits);
 		// Initialize the state for each split.
 		splits.forEach(s -> splitStates.put(s.splitId(), new SplitContext<>(s.splitId(), initializedState(s))));
 		// Hand over the splits to the split fetcher to start fetch.
@@ -229,6 +237,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
 	public void handleSourceEvents(SourceEvent sourceEvent) {
 		LOG.trace("Handling source event: {}", sourceEvent);
 		if (sourceEvent instanceof NoMoreSplitsEvent) {
+			LOG.info("Reader received NoMoreSplits event.");
 			noMoreSplitsAssignment = true;
 			futureNotifier.notifyComplete();
 		}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
index 822a9a9..26d92e3 100644
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
+++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherManager.java
@@ -107,7 +107,8 @@ public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
 
 		// Create the executor with a thread factory that fails the source reader if one of
 		// the fetcher thread exits abnormally.
-		this.executors = Executors.newCachedThreadPool(r -> new Thread(r, "SourceFetcher"));
+		final String taskThreadName = Thread.currentThread().getName();
+		this.executors = Executors.newCachedThreadPool(r -> new Thread(r, "Source Data Fetcher for " + taskThreadName));
 		this.closed = false;
 	}
 


[flink] 11/11: [hotfix][tests] Extend test coverage for FutureCompletingBlockingQueue.

Posted by se...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c60aaff0249bfd6b5871b7f82e03efc487a54d6b
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Sep 15 16:44:49 2020 +0200

    [hotfix][tests] Extend test coverage for FutureCompletingBlockingQueue.
---
 .../FutureCompletingBlockingQueueTest.java         | 91 +++++++++++++++++++---
 1 file changed, 82 insertions(+), 9 deletions(-)

diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
index 2a191d2..cdfef25 100644
--- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
+++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/synchronization/FutureCompletingBlockingQueueTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.connector.base.source.reader.synchronization;
 
+import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
 import org.apache.flink.runtime.io.AvailabilityProvider;
 
 import org.junit.Test;
@@ -32,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -40,8 +42,8 @@ import static org.junit.Assert.fail;
  * The unit test for {@link FutureCompletingBlockingQueue}.
  */
 public class FutureCompletingBlockingQueueTest {
-	private static final Integer DEFAULT_CAPACITY = 2;
-	private static final Integer SPECIFIED_CAPACITY = 20000;
+
+	private static final int DEFAULT_CAPACITY = 2;
 
 	@Test
 	public void testBasics() throws InterruptedException {
@@ -76,6 +78,16 @@ public class FutureCompletingBlockingQueueTest {
 	}
 
 	@Test
+	public void testPollEmptyQueue() throws InterruptedException {
+		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>();
+		queue.put(0, 1234);
+
+		assertNotNull(queue.poll());
+		assertNull(queue.poll());
+		assertNull(queue.poll());
+	}
+
+	@Test
 	public void testWakeUpPut() throws InterruptedException {
 		FutureCompletingBlockingQueue<Integer> queue = new FutureCompletingBlockingQueue<>(1);
 
@@ -144,13 +156,74 @@ public class FutureCompletingBlockingQueueTest {
 	}
 
 	@Test
-	public void testFutureCompletingBlockingQueueConstructor() {
-		FutureCompletingBlockingQueue<Object> defaultCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>();
-		FutureCompletingBlockingQueue<Object> specifiedCapacityFutureCompletingBlockingQueue = new FutureCompletingBlockingQueue<>(SPECIFIED_CAPACITY);
-		// The capacity of the queue needs to be equal to 10000
-		assertEquals(defaultCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) DEFAULT_CAPACITY);
-		// The capacity of the queue needs to be equal to SPECIFIED_CAPACITY
-		assertEquals(specifiedCapacityFutureCompletingBlockingQueue.remainingCapacity(), (int) SPECIFIED_CAPACITY);
+	public void testSpecifiedQueueCapacity() {
+		final int capacity = 8_000;
+		final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>(capacity);
+		assertEquals(capacity, queue.remainingCapacity());
+	}
+
+	@Test
+	public void testQueueDefaultCapacity() {
+		final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>();
+		assertEquals(DEFAULT_CAPACITY, queue.remainingCapacity());
+		assertEquals(DEFAULT_CAPACITY, SourceReaderOptions.ELEMENT_QUEUE_CAPACITY.defaultValue().intValue());
+	}
+
+	@Test
+	public void testUnavailableWhenEmpty() {
+		final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>();
+		assertFalse(queue.getAvailabilityFuture().isDone());
+	}
+
+	@Test
+	public void testImmediatelyAvailableAfterPut() throws InterruptedException {
+		final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>();
+		queue.put(0, new Object());
+		assertTrue(queue.getAvailabilityFuture().isDone());
+	}
+
+	@Test
+	public void testFutureBecomesAvailableAfterPut() throws InterruptedException {
+		final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>();
+		final CompletableFuture<?> future = queue.getAvailabilityFuture();
+		queue.put(0, new Object());
+		assertTrue(future.isDone());
+	}
+
+	@Test
+	public void testUnavailableWhenBecomesEmpty() throws InterruptedException {
+		final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>();
+		queue.put(0, new Object());
+		queue.poll();
+		assertFalse(queue.getAvailabilityFuture().isDone());
+	}
+
+	@Test
+	public void testAvailableAfterNotifyAvailable() throws InterruptedException {
+		final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>();
+		queue.notifyAvailable();
+		assertTrue(queue.getAvailabilityFuture().isDone());
+	}
+
+	@Test
+	public void testFutureBecomesAvailableAfterNotifyAvailable() throws InterruptedException {
+		final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>();
+		final CompletableFuture<?> future = queue.getAvailabilityFuture();
+		queue.notifyAvailable();
+		assertTrue(future.isDone());
+	}
+
+	@Test
+	public void testPollResetsAvailability() throws InterruptedException {
+		final FutureCompletingBlockingQueue<Object> queue = new FutureCompletingBlockingQueue<>();
+		queue.notifyAvailable();
+
+		final CompletableFuture<?> beforePoll = queue.getAvailabilityFuture();
+		queue.poll();
+		final CompletableFuture<?> afterPoll = queue.getAvailabilityFuture();
+
+		assertTrue(beforePoll.isDone());
+		assertFalse(afterPoll.isDone());
 	}
 
 	/**