You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/12 06:03:38 UTC

[flink] branch master updated (db488c0 -> 2c8ee78)

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from db488c0  [FLINK-13220][table-planner-blink] Add DDL support for blink planner
     new 130e797  [hotfix][network] Move private NoOpResultSubpartitionView class out of PartitionRequestQueueTest
     new 67bc355  [hotfix][network] Make BoundedBlockingSubpartitionType configurable
     new 2c8ee78  [FLINK-13100][network] Fix the bug of throwing IOException while FileChannelBoundedData#nextBuffer

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../NettyShuffleEnvironmentOptions.java            |   7 +
 .../io/network/NettyShuffleServiceFactory.java     |   1 +
 .../partition/BoundedBlockingSubpartition.java     |   3 +-
 .../BoundedBlockingSubpartitionReader.java         |  43 ++++-
 .../partition/BoundedBlockingSubpartitionType.java |  13 ++
 .../runtime/io/network/partition/BoundedData.java  |  12 +-
 .../network/partition/FileChannelBoundedData.java  |  36 ++--
 .../FileChannelMemoryMappedBoundedData.java        |   2 +-
 .../network/partition/MemoryMappedBoundedData.java |   2 +-
 .../partition/NoOpResultSubpartitionView.java}     |  40 +++--
 .../network/partition/ResultPartitionFactory.java  |  16 +-
 .../NettyShuffleEnvironmentConfiguration.java      |  29 ++-
 .../io/network/NettyShuffleEnvironmentBuilder.java |   4 +-
 .../network/netty/PartitionRequestQueueTest.java   |  40 +----
 ...oundedBlockingSubpartitionAvailabilityTest.java | 158 +++++++++++++++++
 .../partition/BoundedBlockingSubpartitionTest.java |  38 +++-
 .../io/network/partition/BoundedDataTestBase.java  |   2 +-
 ...ener.java => CountingAvailabilityListener.java} |   8 +-
 .../partition/FileChannelBoundedDataTest.java      | 172 ++++++++++++++++++
 .../network/partition/ResultPartitionBuilder.java  |   8 +
 .../partition/ResultPartitionFactoryTest.java      |   1 +
 .../flink/test/runtime/FileBufferReaderITCase.java | 196 +++++++++++++++++++++
 22 files changed, 734 insertions(+), 97 deletions(-)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/{blob/VoidBlobStore.java => io/network/partition/NoOpResultSubpartitionView.java} (60%)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
 copy flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/{NoOpBufferAvailablityListener.java => CountingAvailabilityListener.java} (81%)
 create mode 100644 flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java


[flink] 01/03: [hotfix][network] Move private NoOpResultSubpartitionView class out of PartitionRequestQueueTest

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 130e7970b5aefc064498f0d67811260face2e5e5
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Tue Jul 9 18:32:35 2019 +0800

    [hotfix][network] Move private NoOpResultSubpartitionView class out of PartitionRequestQueueTest
    
    NoOpResultSubpartitionView is defined as a private class in PartitionRequestQueueTest. It could be used widely
    in other places future, so it is reasonable to refactor it as a separate public class.
---
 .../partition/NoOpResultSubpartitionView.java      | 64 ++++++++++++++++++++++
 .../network/netty/PartitionRequestQueueTest.java   | 40 +-------------
 2 files changed, 65 insertions(+), 39 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
new file mode 100644
index 0000000..f3ba1e3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import javax.annotation.Nullable;
+
+/**
+ * A dummy implementation of the {@link ResultSubpartitionView}.
+ */
+public class NoOpResultSubpartitionView implements ResultSubpartitionView {
+
+	@Nullable
+	public ResultSubpartition.BufferAndBacklog getNextBuffer() {
+		return null;
+	}
+
+	@Override
+	public void notifyDataAvailable() {
+	}
+
+	@Override
+	public void releaseAllResources() {
+	}
+
+	@Override
+	public void notifySubpartitionConsumed() {
+	}
+
+	@Override
+	public boolean isReleased() {
+		return true;
+	}
+
+	@Override
+	public Throwable getFailureCause() {
+		return null;
+	}
+
+	@Override
+	public boolean nextBufferIsEvent() {
+		return false;
+	}
+
+	@Override
+	public boolean isAvailable() {
+		return false;
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index 2deaa9b..a9e8662 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
+import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
@@ -369,43 +370,4 @@ public class PartitionRequestQueueTest {
 
 		return channelBlockingBuffer;
 	}
-
-	private static class NoOpResultSubpartitionView implements ResultSubpartitionView {
-		@Nullable
-		public BufferAndBacklog getNextBuffer() {
-			return null;
-		}
-
-		@Override
-		public void notifyDataAvailable() {
-		}
-
-		@Override
-		public void releaseAllResources() {
-		}
-
-		@Override
-		public void notifySubpartitionConsumed() {
-		}
-
-		@Override
-		public boolean isReleased() {
-			return true;
-		}
-
-		@Override
-		public Throwable getFailureCause() {
-			return null;
-		}
-
-		@Override
-		public boolean nextBufferIsEvent() {
-			return false;
-		}
-
-		@Override
-		public boolean isAvailable() {
-			return false;
-		}
-	}
 }


[flink] 02/03: [hotfix][network] Make BoundedBlockingSubpartitionType configurable

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 67bc355d0edda18563c57dc8b6f2ba61f066e872
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Wed Jul 10 12:18:30 2019 +0800

    [hotfix][network] Make BoundedBlockingSubpartitionType configurable
    
    The current BoundedBlockingSubpartitionType is internally selected based on memory architecture, so it is difficult to cover
    all the type implementations in ITCases/tests. The config option "taskmanager.network.bounded-blocking-subpartition-type" is
    added into NettyShuffleEnvironmentOptions only for ITCases/tests at the moment. And we make BoundedBlockingSubpartitionType
    aslo configurable in ResultPartitionBuilder.
---
 .../NettyShuffleEnvironmentOptions.java            |  7 ++++++
 .../io/network/NettyShuffleServiceFactory.java     |  1 +
 .../partition/BoundedBlockingSubpartitionType.java | 13 ++++++++++
 .../network/partition/ResultPartitionFactory.java  | 16 +++++++-----
 .../NettyShuffleEnvironmentConfiguration.java      | 29 ++++++++++++++++++++--
 .../io/network/NettyShuffleEnvironmentBuilder.java |  4 ++-
 .../network/partition/ResultPartitionBuilder.java  |  8 ++++++
 .../partition/ResultPartitionFactoryTest.java      |  1 +
 8 files changed, 70 insertions(+), 9 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index b8e470c..4ba4c8e 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -153,6 +153,13 @@ public class NettyShuffleEnvironmentOptions {
 					"tasks have occupied all the buffers and the downstream tasks are waiting for the exclusive buffers. The timeout breaks" +
 					"the tie by failing the request of exclusive buffers and ask users to increase the number of total buffers.");
 
+	@Documentation.ExcludeFromDocumentation("This option is only used for testing at the moment.")
+	public static final ConfigOption<String> NETWORK_BOUNDED_BLOCKING_SUBPARTITION_TYPE =
+		key("taskmanager.network.bounded-blocking-subpartition-type")
+			.defaultValue("auto")
+			.withDescription("The bounded blocking subpartition type, either \"mmap\" or \"file\". The default \"auto\" means selecting the" +
+					"property type automatically based on system memory architecture.");
+
 	// ------------------------------------------------------------------------
 	//  Netty Options
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 0dbfb94..431360a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -101,6 +101,7 @@ public class NettyShuffleServiceFactory implements ShuffleServiceFactory<NettySh
 			resultPartitionManager,
 			fileChannelManager,
 			networkBufferPool,
+			config.getBlockingSubpartitionType(),
 			config.networkBuffersPerChannel(),
 			config.floatingNetworkBuffersPerGate(),
 			config.networkBufferSize());
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java
index 9b43264..63a1652 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionType.java
@@ -64,6 +64,19 @@ public enum BoundedBlockingSubpartitionType {
 		public BoundedBlockingSubpartition create(int index, ResultPartition parent, File tempFile, int readBufferSize) throws IOException {
 			return BoundedBlockingSubpartition.createWithFileAndMemoryMappedReader(index, parent, tempFile);
 		}
+	},
+
+	/**
+	 * Selects the BoundedBlockingSubpartition type based on the current memory architecture. If 64-bit,
+	 * the type of {@link BoundedBlockingSubpartitionType#FILE_MMAP} is recommended. Otherwise, the type
+	 * of {@link BoundedBlockingSubpartitionType#FILE} is by default.
+	 */
+	AUTO {
+
+		@Override
+		public BoundedBlockingSubpartition create(int index, ResultPartition parent, File tempFile, int readBufferSize) throws IOException {
+			return ResultPartitionFactory.getBoundedBlockingType().create(index, parent, tempFile, readBufferSize);
+		}
 	};
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index db8d234..4d5cf23 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -46,8 +46,6 @@ public class ResultPartitionFactory {
 
 	private static final Logger LOG = LoggerFactory.getLogger(ResultPartitionFactory.class);
 
-	private static final BoundedBlockingSubpartitionType BOUNDED_BLOCKING_TYPE = getBoundedBlockingType();
-
 	@Nonnull
 	private final ResultPartitionManager partitionManager;
 
@@ -57,6 +55,8 @@ public class ResultPartitionFactory {
 	@Nonnull
 	private final BufferPoolFactory bufferPoolFactory;
 
+	private final BoundedBlockingSubpartitionType blockingSubpartitionType;
+
 	private final int networkBuffersPerChannel;
 
 	private final int floatingNetworkBuffersPerGate;
@@ -67,6 +67,7 @@ public class ResultPartitionFactory {
 		@Nonnull ResultPartitionManager partitionManager,
 		@Nonnull FileChannelManager channelManager,
 		@Nonnull BufferPoolFactory bufferPoolFactory,
+		BoundedBlockingSubpartitionType blockingSubpartitionType,
 		int networkBuffersPerChannel,
 		int floatingNetworkBuffersPerGate,
 		int networkBufferSize) {
@@ -76,6 +77,7 @@ public class ResultPartitionFactory {
 		this.networkBuffersPerChannel = networkBuffersPerChannel;
 		this.floatingNetworkBuffersPerGate = floatingNetworkBuffersPerGate;
 		this.bufferPoolFactory = bufferPoolFactory;
+		this.blockingSubpartitionType = blockingSubpartitionType;
 		this.networkBufferSize = networkBufferSize;
 	}
 
@@ -123,7 +125,7 @@ public class ResultPartitionFactory {
 				partitionManager,
 				bufferPoolFactory);
 
-		createSubpartitions(partition, type, subpartitions);
+		createSubpartitions(partition, type, blockingSubpartitionType, subpartitions);
 
 		LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);
 
@@ -133,12 +135,13 @@ public class ResultPartitionFactory {
 	private void createSubpartitions(
 			ResultPartition partition,
 			ResultPartitionType type,
+			BoundedBlockingSubpartitionType blockingSubpartitionType,
 			ResultSubpartition[] subpartitions) {
 
 		// Create the subpartitions.
 		switch (type) {
 			case BLOCKING:
-				initializeBoundedBlockingPartitions(subpartitions, partition, networkBufferSize, channelManager);
+				initializeBoundedBlockingPartitions(subpartitions, partition, blockingSubpartitionType, networkBufferSize, channelManager);
 				break;
 
 			case PIPELINED:
@@ -157,6 +160,7 @@ public class ResultPartitionFactory {
 	private static void initializeBoundedBlockingPartitions(
 		ResultSubpartition[] subpartitions,
 		ResultPartition parent,
+		BoundedBlockingSubpartitionType blockingSubpartitionType,
 		int networkBufferSize,
 		FileChannelManager channelManager) {
 
@@ -164,7 +168,7 @@ public class ResultPartitionFactory {
 		try {
 			for (; i < subpartitions.length; i++) {
 				final File spillFile = channelManager.createChannel().getPathFile();
-				subpartitions[i] = BOUNDED_BLOCKING_TYPE.create(i, parent, spillFile, networkBufferSize);
+				subpartitions[i] = blockingSubpartitionType.create(i, parent, spillFile, networkBufferSize);
 			}
 		}
 		catch (IOException e) {
@@ -201,7 +205,7 @@ public class ResultPartitionFactory {
 		};
 	}
 
-	private static BoundedBlockingSubpartitionType getBoundedBlockingType() {
+	static BoundedBlockingSubpartitionType getBoundedBlockingType() {
 		switch (MemoryArchitecture.get()) {
 			case _64_BIT:
 				return BoundedBlockingSubpartitionType.FILE_MMAP;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index c080349..e73cc6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
 import org.apache.flink.util.Preconditions;
@@ -70,6 +71,8 @@ public class NettyShuffleEnvironmentConfiguration {
 
 	private final String[] tempDirs;
 
+	private final BoundedBlockingSubpartitionType blockingSubpartitionType;
+
 	public NettyShuffleEnvironmentConfiguration(
 			int numNetworkBuffers,
 			int networkBufferSize,
@@ -81,7 +84,8 @@ public class NettyShuffleEnvironmentConfiguration {
 			boolean isCreditBased,
 			boolean isNetworkDetailedMetrics,
 			@Nullable NettyConfig nettyConfig,
-			String[] tempDirs) {
+			String[] tempDirs,
+			BoundedBlockingSubpartitionType blockingSubpartitionType) {
 
 		this.numNetworkBuffers = numNetworkBuffers;
 		this.networkBufferSize = networkBufferSize;
@@ -94,6 +98,7 @@ public class NettyShuffleEnvironmentConfiguration {
 		this.isNetworkDetailedMetrics = isNetworkDetailedMetrics;
 		this.nettyConfig = nettyConfig;
 		this.tempDirs = Preconditions.checkNotNull(tempDirs);
+		this.blockingSubpartitionType = Preconditions.checkNotNull(blockingSubpartitionType);
 	}
 
 	// ------------------------------------------------------------------------
@@ -142,6 +147,10 @@ public class NettyShuffleEnvironmentConfiguration {
 		return tempDirs;
 	}
 
+	public BoundedBlockingSubpartitionType getBlockingSubpartitionType() {
+		return blockingSubpartitionType;
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**
@@ -183,6 +192,8 @@ public class NettyShuffleEnvironmentConfiguration {
 		Duration requestSegmentsTimeout = Duration.ofMillis(configuration.getLong(
 				NettyShuffleEnvironmentOptions.NETWORK_EXCLUSIVE_BUFFERS_REQUEST_TIMEOUT_MILLISECONDS));
 
+		BoundedBlockingSubpartitionType blockingSubpartitionType = getBlockingSubpartitionType(configuration);
+
 		return new NettyShuffleEnvironmentConfiguration(
 			numberOfNetworkBuffers,
 			pageSize,
@@ -194,7 +205,8 @@ public class NettyShuffleEnvironmentConfiguration {
 			isCreditBased,
 			isNetworkDetailedMetrics,
 			nettyConfig,
-			tempDirs);
+			tempDirs,
+			blockingSubpartitionType);
 	}
 
 	/**
@@ -481,6 +493,19 @@ public class NettyShuffleEnvironmentConfiguration {
 		return nettyConfig;
 	}
 
+	private static BoundedBlockingSubpartitionType getBlockingSubpartitionType(Configuration config) {
+		String transport = config.getString(NettyShuffleEnvironmentOptions.NETWORK_BOUNDED_BLOCKING_SUBPARTITION_TYPE);
+
+		switch (transport) {
+			case "mmap":
+				return BoundedBlockingSubpartitionType.FILE_MMAP;
+			case "file":
+				return BoundedBlockingSubpartitionType.FILE;
+			default:
+				return BoundedBlockingSubpartitionType.AUTO;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 7250136..2d360ba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration;
 import org.apache.flink.runtime.util.EnvironmentInformation;
@@ -141,7 +142,8 @@ public class NettyShuffleEnvironmentBuilder {
 				isCreditBased,
 				isNetworkDetailedMetrics,
 				nettyConfig,
-				tempDirs),
+				tempDirs,
+				BoundedBlockingSubpartitionType.AUTO),
 			taskManagerLocation,
 			taskEventDispatcher,
 			metricGroup);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 59e045e..8eb68d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -38,6 +38,8 @@ public class ResultPartitionBuilder {
 
 	private ResultPartitionType partitionType = ResultPartitionType.PIPELINED;
 
+	private BoundedBlockingSubpartitionType blockingSubpartitionType = BoundedBlockingSubpartitionType.AUTO;
+
 	private int numberOfSubpartitions = 1;
 
 	private int numTargetKeyGroups = 1;
@@ -127,11 +129,17 @@ public class ResultPartitionBuilder {
 		return this;
 	}
 
+	public ResultPartitionBuilder setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType blockingSubpartitionType) {
+		this.blockingSubpartitionType = blockingSubpartitionType;
+		return this;
+	}
+
 	public ResultPartition build() {
 		ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
 			partitionManager,
 			channelManager,
 			networkBufferPool,
+			blockingSubpartitionType,
 			networkBuffersPerChannel,
 			floatingNetworkBuffersPerGate,
 			networkBufferSize);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 43a3673..b2a4d16 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -73,6 +73,7 @@ public class ResultPartitionFactoryTest extends TestLogger {
 			new ResultPartitionManager(),
 			fileChannelManager,
 			new NetworkBufferPool(1, 64, 1),
+			BoundedBlockingSubpartitionType.AUTO,
 			1,
 			1,
 			64);


[flink] 03/03: [FLINK-13100][network] Fix the bug of throwing IOException while FileChannelBoundedData#nextBuffer

Posted by pn...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2c8ee78f714af4995f618c768661f5b638cd3025
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Wed Jul 10 14:31:30 2019 +0800

    [FLINK-13100][network] Fix the bug of throwing IOException while FileChannelBoundedData#nextBuffer
    
    The implementation of FileChannelBoundedData#nextBuffer assumes that there is always an available buffer, otherwise an IOException is thrown
    and it always assumes that pool of two buffers is enough (before using the 3rd buffer, first one was expected to be recycled already). But in
    the case of pending flush operation (when the socket channel is not writable while netty thread is calling writeAndFlush method), the first
    fetched buffer from FileChannelBoundedData has not been recycled while fetching the second buffer to trigger next read ahead, which breaks the
    above assumption.
    
    In order to fix this problem, we make read ahead is not always available for FileChannelBoundedData. If there are no available buffers to read
    the next data, we retrigger the read ahead while recycling buffer via ResultSubpartitionView#notifyDataAvailable.
---
 .../partition/BoundedBlockingSubpartition.java     |   3 +-
 .../BoundedBlockingSubpartitionReader.java         |  43 ++++-
 .../runtime/io/network/partition/BoundedData.java  |  12 +-
 .../network/partition/FileChannelBoundedData.java  |  36 ++--
 .../FileChannelMemoryMappedBoundedData.java        |   2 +-
 .../network/partition/MemoryMappedBoundedData.java |   2 +-
 ...oundedBlockingSubpartitionAvailabilityTest.java | 158 +++++++++++++++++
 .../partition/BoundedBlockingSubpartitionTest.java |  38 +++-
 .../io/network/partition/BoundedDataTestBase.java  |   2 +-
 ...Test.java => CountingAvailabilityListener.java} |  21 +--
 .../partition/FileChannelBoundedDataTest.java      | 172 ++++++++++++++++++
 .../flink/test/runtime/FileBufferReaderITCase.java | 196 +++++++++++++++++++++
 12 files changed, 638 insertions(+), 47 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
index 7a74872..785e44c 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartition.java
@@ -198,9 +198,8 @@ final class BoundedBlockingSubpartition extends ResultSubpartition {
 
 			availability.notifyDataAvailable();
 
-			final BoundedData.Reader dataReader = data.createReader();
 			final BoundedBlockingSubpartitionReader reader = new BoundedBlockingSubpartitionReader(
-					this, dataReader, numDataBuffersWritten);
+					this, data, numDataBuffersWritten, availability);
 			readers.add(reader);
 			return reader;
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
index f7536b9..63e5e22 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
@@ -37,6 +37,9 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView
 	/** The result subpartition that we read. */
 	private final BoundedBlockingSubpartition parent;
 
+	/** The listener that is notified when there are available buffers for this subpartition view. */
+	private final BufferAvailabilityListener availabilityListener;
+
 	/** The next buffer (look ahead). Null once the data is depleted or reader is disposed. */
 	@Nullable
 	private Buffer nextBuffer;
@@ -57,16 +60,20 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView
 	 */
 	BoundedBlockingSubpartitionReader(
 			BoundedBlockingSubpartition parent,
-			BoundedData.Reader dataReader,
-			int numDataBuffers) throws IOException {
-
-		checkArgument(numDataBuffers >= 0);
+			BoundedData data,
+			int numDataBuffers,
+			BufferAvailabilityListener availabilityListener) throws IOException {
 
 		this.parent = checkNotNull(parent);
-		this.dataReader = checkNotNull(dataReader);
-		this.dataBufferBacklog = numDataBuffers;
 
+		checkNotNull(data);
+		this.dataReader = data.createReader(this);
 		this.nextBuffer = dataReader.nextBuffer();
+
+		checkArgument(numDataBuffers >= 0);
+		this.dataBufferBacklog = numDataBuffers;
+
+		this.availabilityListener = checkNotNull(availabilityListener);
 	}
 
 	@Nullable
@@ -89,9 +96,31 @@ final class BoundedBlockingSubpartitionReader implements ResultSubpartitionView
 		return BufferAndBacklog.fromBufferAndLookahead(current, nextBuffer, dataBufferBacklog);
 	}
 
+	/**
+	 * This method is actually only meaningful for the {@link BoundedBlockingSubpartitionType#FILE}.
+	 *
+	 * <p>For the other types the {@link #nextBuffer} can not be ever set to null, so it is no need
+	 * to notify available via this method. But the implementation is also compatible with other
+	 * types even though called by mistake.
+	 */
 	@Override
 	public void notifyDataAvailable() {
-		throw new IllegalStateException("No data should become available on a blocking partition during consumption.");
+		if (nextBuffer == null) {
+			assert dataReader != null;
+
+			try {
+				nextBuffer = dataReader.nextBuffer();
+			} catch (IOException ex) {
+				// this exception wrapper is only for avoiding throwing IOException explicitly
+				// in relevant interface methods
+				throw new IllegalStateException("No data available while reading", ex);
+			}
+
+			// next buffer is null indicates the end of partition
+			if (nextBuffer != null) {
+				availabilityListener.notifyDataAvailable();
+			}
+		}
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java
index 4d58cf8..a8681cc 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedData.java
@@ -34,7 +34,7 @@ import java.io.IOException;
  * through the {@link #writeBuffer(Buffer)} method.
  * The write phase is ended by calling {@link #finishWrite()}.
  * After the write phase is finished, the data can be read multiple times through readers created
- * via {@link #createReader()}.
+ * via {@link #createReader(ResultSubpartitionView)}.
  * Finally, the BoundedData is dropped / deleted by calling {@link #close()}.
  *
  * <h2>Thread Safety and Concurrency</h2>
@@ -60,7 +60,15 @@ interface BoundedData extends Closeable {
 	 * Gets a reader for the bounded data. Multiple readers may be created.
 	 * This call only succeeds once the write phase was finished via {@link #finishWrite()}.
 	 */
-	BoundedData.Reader createReader() throws IOException;
+	BoundedData.Reader createReader(ResultSubpartitionView subpartitionView) throws IOException;
+
+	/**
+	 * Gets a reader for the bounded data. Multiple readers may be created.
+	 * This call only succeeds once the write phase was finished via {@link #finishWrite()}.
+	 */
+	default BoundedData.Reader createReader() throws IOException {
+		return createReader(new NoOpResultSubpartitionView());
+	}
 
 	/**
 	 * Gets the number of bytes of all written data (including the metadata in the buffer headers).
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
index 50dca60..690ad7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedData.java
@@ -75,11 +75,11 @@ final class FileChannelBoundedData implements BoundedData {
 	}
 
 	@Override
-	public Reader createReader() throws IOException {
+	public Reader createReader(ResultSubpartitionView subpartitionView) throws IOException {
 		checkState(!fileChannel.isOpen());
 
 		final FileChannel fc = FileChannel.open(filePath, StandardOpenOption.READ);
-		return new FileBufferReader(fc, memorySegmentSize);
+		return new FileBufferReader(fc, memorySegmentSize, subpartitionView);
 	}
 
 	@Override
@@ -117,7 +117,12 @@ final class FileChannelBoundedData implements BoundedData {
 
 		private final ArrayDeque<MemorySegment> buffers;
 
-		FileBufferReader(FileChannel fileChannel, int bufferSize) {
+		private final ResultSubpartitionView subpartitionView;
+
+		/** The tag indicates whether we have read the end of this file. */
+		private boolean isFinished;
+
+		FileBufferReader(FileChannel fileChannel, int bufferSize, ResultSubpartitionView subpartitionView) {
 			this.fileChannel = checkNotNull(fileChannel);
 			this.headerBuffer = BufferReaderWriterUtil.allocatedHeaderBuffer();
 			this.buffers = new ArrayDeque<>(NUM_BUFFERS);
@@ -125,26 +130,25 @@ final class FileChannelBoundedData implements BoundedData {
 			for (int i = 0; i < NUM_BUFFERS; i++) {
 				buffers.addLast(MemorySegmentFactory.allocateUnpooledOffHeapMemory(bufferSize, null));
 			}
+
+			this.subpartitionView = checkNotNull(subpartitionView);
 		}
 
 		@Nullable
 		@Override
 		public Buffer nextBuffer() throws IOException {
 			final MemorySegment memory = buffers.pollFirst();
+			if (memory == null) {
+				return null;
+			}
 
-			if (memory != null) {
-				final Buffer next = BufferReaderWriterUtil.readFromByteChannel(fileChannel, headerBuffer, memory, this);
-				if (next != null) {
-					return next;
-				}
-				else {
-					recycle(memory);
-					return null;
-				}
+			final Buffer next = BufferReaderWriterUtil.readFromByteChannel(fileChannel, headerBuffer, memory, this);
+			if (next == null) {
+				isFinished = true;
+				recycle(memory);
 			}
 
-			throw new IOException("Bug in BoundedBlockingSubpartition with FILE data: " +
-					"Requesting new buffer before previous buffer returned.");
+			return next;
 		}
 
 		@Override
@@ -155,6 +159,10 @@ final class FileChannelBoundedData implements BoundedData {
 		@Override
 		public void recycle(MemorySegment memorySegment) {
 			buffers.addLast(memorySegment);
+
+			if (!isFinished) {
+				subpartitionView.notifyDataAvailable();
+			}
 		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
index 4a71fcd..f22efd0 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/FileChannelMemoryMappedBoundedData.java
@@ -123,7 +123,7 @@ final class FileChannelMemoryMappedBoundedData implements BoundedData {
 	}
 
 	@Override
-	public BoundedData.Reader createReader() {
+	public BoundedData.Reader createReader(ResultSubpartitionView ignored) {
 		checkState(!fileChannel.isOpen());
 
 		final List<ByteBuffer> buffers = memoryMappedRegions.stream()
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
index 502c64c..e8718f5 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/MemoryMappedBoundedData.java
@@ -113,7 +113,7 @@ final class MemoryMappedBoundedData implements BoundedData {
 	}
 
 	@Override
-	public BufferSlicer createReader() {
+	public BufferSlicer createReader(ResultSubpartitionView ignored) {
 		assert currentBuffer == null;
 
 		final List<ByteBuffer> buffers = fullBuffers.stream()
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
new file mode 100644
index 0000000..915cf43
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the availability handling of the BoundedBlockingSubpartitions with not constant
+ * availability.
+ */
+public class BoundedBlockingSubpartitionAvailabilityTest {
+
+	@ClassRule
+	public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
+
+	private static final int BUFFER_SIZE = 32 * 1024;
+
+	@Test
+	public void testInitiallyAvailable() throws Exception {
+		final ResultSubpartition subpartition = createPartitionWithData(10);
+		final CountingAvailabilityListener listener = new CountingAvailabilityListener();
+
+		// test
+		final ResultSubpartitionView subpartitionView = subpartition.createReadView(listener);
+
+		// assert
+		assertEquals(1, listener.numNotifications);
+
+		// cleanup
+		subpartitionView.releaseAllResources();
+		subpartition.release();
+	}
+
+	@Test
+	public void testUnavailableWhenBuffersExhausted() throws Exception {
+		// setup
+		final BoundedBlockingSubpartition subpartition = createPartitionWithData(100_000);
+		final CountingAvailabilityListener listener = new CountingAvailabilityListener();
+		final ResultSubpartitionView reader = subpartition.createReadView(listener);
+
+		// test
+		final List<BufferAndBacklog> data = drainAvailableData(reader);
+
+		// assert
+		assertFalse(reader.isAvailable());
+		assertFalse(data.get(data.size() - 1).isMoreAvailable());
+
+		// cleanup
+		reader.releaseAllResources();
+		subpartition.release();
+	}
+
+	@Test
+	public void testAvailabilityNotificationWhenBuffersReturn() throws Exception {
+		// setup
+		final ResultSubpartition subpartition = createPartitionWithData(100_000);
+		final CountingAvailabilityListener listener = new CountingAvailabilityListener();
+		final ResultSubpartitionView reader = subpartition.createReadView(listener);
+
+		// test
+		final List<ResultSubpartition.BufferAndBacklog> data = drainAvailableData(reader);
+		data.get(0).buffer().recycleBuffer();
+		data.get(1).buffer().recycleBuffer();
+
+		// assert
+		assertTrue(reader.isAvailable());
+		assertEquals(2, listener.numNotifications); // one initial, one for new availability
+
+		// cleanup
+		reader.releaseAllResources();
+		subpartition.release();
+	}
+
+	@Test
+	public void testNotAvailableWhenEmpty() throws Exception {
+		// setup
+		final ResultSubpartition subpartition = createPartitionWithData(100_000);
+		final ResultSubpartitionView reader = subpartition.createReadView(new NoOpBufferAvailablityListener());
+
+		// test
+		drainAllData(reader);
+
+		// assert
+		assertFalse(reader.isAvailable());
+
+		// cleanup
+		reader.releaseAllResources();
+		subpartition.release();
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static BoundedBlockingSubpartition createPartitionWithData(int numberOfBuffers) throws IOException {
+		ResultPartition parent = PartitionTestUtils.createPartition();
+
+		BoundedBlockingSubpartition partition = BoundedBlockingSubpartition.createWithFileChannel(
+			0, parent, new File(TMP_FOLDER.newFolder(), "data"), BUFFER_SIZE);
+
+		writeBuffers(partition, numberOfBuffers);
+		partition.finish();
+
+		return partition;
+	}
+
+	private static void writeBuffers(ResultSubpartition partition, int numberOfBuffers) throws IOException {
+		for (int i = 0; i < numberOfBuffers; i++) {
+			partition.add(BufferBuilderTestUtils.createFilledBufferConsumer(BUFFER_SIZE, BUFFER_SIZE));
+		}
+	}
+
+	private static List<BufferAndBacklog> drainAvailableData(ResultSubpartitionView reader) throws Exception {
+		final ArrayList<BufferAndBacklog> list = new ArrayList<>();
+
+		BufferAndBacklog bab;
+		while ((bab = reader.getNextBuffer()) != null) {
+			list.add(bab);
+		}
+
+		return list;
+	}
+
+	private static void drainAllData(ResultSubpartitionView reader) throws Exception {
+		BufferAndBacklog bab;
+		while ((bab = reader.getNextBuffer()) != null) {
+			bab.buffer().recycleBuffer();
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
index 9bd0c4b..ce4083f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionTest.java
@@ -35,6 +35,7 @@ import java.io.File;
 import java.io.IOException;
 
 import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -80,10 +81,11 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase {
 	}
 
 	@Test
-	public void testClosingClosesBoundedData() throws Exception {
+	public void testCloseBoundedData() throws Exception {
 		final TestingBoundedDataReader reader = new TestingBoundedDataReader();
+		final TestingBoundedData data = new TestingBoundedData(reader);
 		final BoundedBlockingSubpartitionReader bbspr = new BoundedBlockingSubpartitionReader(
-				(BoundedBlockingSubpartition) createSubpartition(), reader, 10);
+				(BoundedBlockingSubpartition) createSubpartition(), data, 10, new NoOpBufferAvailablityListener());
 
 		bbspr.releaseAllResources();
 
@@ -124,7 +126,7 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase {
 		}
 
 		@Override
-		public Reader createReader() throws IOException {
+		public Reader createReader(ResultSubpartitionView subpartitionView) throws IOException {
 			throw new UnsupportedOperationException();
 		}
 
@@ -137,6 +139,36 @@ public class BoundedBlockingSubpartitionTest extends SubpartitionTestBase {
 		public void close() {}
 	}
 
+	private static class TestingBoundedData implements BoundedData {
+
+		private BoundedData.Reader reader;
+
+		private TestingBoundedData(BoundedData.Reader reader) {
+			this.reader = checkNotNull(reader);
+		}
+
+		@Override
+		public void writeBuffer(Buffer buffer) throws IOException {
+		}
+
+		@Override
+		public void finishWrite() throws IOException {
+		}
+
+		@Override
+		public Reader createReader(ResultSubpartitionView ignored) throws IOException {
+			return reader;
+		}
+
+		@Override
+		public long getSize() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void close() {}
+	}
+
 	private static class TestingBoundedDataReader implements BoundedData.Reader {
 
 		boolean closed;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java
index c71b9df..365e93e 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedDataTestBase.java
@@ -59,7 +59,7 @@ public abstract class BoundedDataTestBase {
 
 	protected abstract BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException;
 
-	private BoundedData createBoundedData() throws IOException {
+	protected BoundedData createBoundedData() throws IOException {
 		return createBoundedData(createTempPath());
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/CountingAvailabilityListener.java
old mode 100755
new mode 100644
similarity index 59%
copy from flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
copy to flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/CountingAvailabilityListener.java
index cc499f4..4e27ee0
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/CountingAvailabilityListener.java
@@ -18,26 +18,15 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import java.io.IOException;
-import java.nio.file.Path;
-
 /**
- * Tests that read the BoundedBlockingSubpartition with multiple threads in parallel.
+ * A simple BufferAvailabilityListener that counts the number of notifications.
  */
-public class FileChannelBoundedDataTest extends BoundedDataTestBase {
-
-	@Override
-	protected boolean isRegionBased() {
-		return false;
-	}
+final class CountingAvailabilityListener implements BufferAvailabilityListener {
 
-	@Override
-	protected BoundedData createBoundedData(Path tempFilePath) throws IOException {
-		return FileChannelBoundedData.create(tempFilePath, BUFFER_SIZE);
-	}
+	int numNotifications;
 
 	@Override
-	protected BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException {
-		throw new UnsupportedOperationException();
+	public void notifyDataAvailable() {
+		numNotifications++;
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
index cc499f4..1ca2bc8 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
@@ -18,14 +18,45 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.io.disk.FileChannelManager;
+import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
 import java.io.IOException;
 import java.nio.file.Path;
 
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSomeBuffer;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Tests that read the BoundedBlockingSubpartition with multiple threads in parallel.
  */
 public class FileChannelBoundedDataTest extends BoundedDataTestBase {
 
+	private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory();
+
+	private static FileChannelManager fileChannelManager;
+
+	@BeforeClass
+	public static void setUp() {
+		fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
+	}
+
+	@AfterClass
+	public static void shutdown() throws Exception {
+		fileChannelManager.close();
+	}
+
 	@Override
 	protected boolean isRegionBased() {
 		return false;
@@ -40,4 +71,145 @@ public class FileChannelBoundedDataTest extends BoundedDataTestBase {
 	protected BoundedData createBoundedDataWithRegion(Path tempFilePath, int regionSize) throws IOException {
 		throw new UnsupportedOperationException();
 	}
+
+	@Test
+	public void testReadNextBuffer() throws Exception {
+		final int numberOfBuffers = 3;
+		try (final BoundedData data = createBoundedData()) {
+			writeBuffers(data, numberOfBuffers);
+
+			final BoundedData.Reader reader = data.createReader();
+			final Buffer buffer1 = reader.nextBuffer();
+			final Buffer buffer2 = reader.nextBuffer();
+
+			assertNotNull(buffer1);
+			assertNotNull(buffer2);
+			// there are only two available memory segments for reading data
+			assertNull(reader.nextBuffer());
+
+			// cleanup
+			buffer1.recycleBuffer();
+			buffer2.recycleBuffer();
+		}
+	}
+
+	@Test
+	public void testRecycleBufferForNotifyingSubpartitionView() throws Exception {
+		final int numberOfBuffers = 2;
+		try (final BoundedData data = createBoundedData()) {
+			writeBuffers(data, numberOfBuffers);
+
+			final VerifyNotificationResultSubpartitionView subpartitionView = new VerifyNotificationResultSubpartitionView();
+			final BoundedData.Reader reader = data.createReader(subpartitionView);
+			final Buffer buffer1 = reader.nextBuffer();
+			final Buffer buffer2 = reader.nextBuffer();
+			assertNotNull(buffer1);
+			assertNotNull(buffer2);
+
+			assertFalse(subpartitionView.isAvailable);
+			buffer1.recycleBuffer();
+			// the view is notified while recycling buffer if reader has not tagged finished
+			assertTrue(subpartitionView.isAvailable);
+
+			subpartitionView.resetAvailable();
+			assertFalse(subpartitionView.isAvailable);
+
+			// the next buffer is null to make reader tag finished
+			assertNull(reader.nextBuffer());
+
+			buffer2.recycleBuffer();
+			// the view is not notified while recycling buffer if reader already finished
+			assertFalse(subpartitionView.isAvailable);
+		}
+	}
+
+	@Test
+	public void testRecycleBufferForNotifyingBufferAvailabilityListener() throws Exception {
+		final ResultSubpartition subpartition = createFileBoundedBlockingSubpartition();
+		final int numberOfBuffers = 2;
+		writeBuffers(subpartition, numberOfBuffers);
+
+		final VerifyNotificationBufferAvailabilityListener listener = new VerifyNotificationBufferAvailabilityListener();
+		final ResultSubpartitionView subpartitionView = subpartition.createReadView(listener);
+		// the notification is triggered while creating view
+		assertTrue(listener.isAvailable);
+
+		listener.resetAvailable();
+		assertFalse(listener.isAvailable);
+
+		final BufferAndBacklog buffer1 = subpartitionView.getNextBuffer();
+		final BufferAndBacklog buffer2 = subpartitionView.getNextBuffer();
+		assertNotNull(buffer1);
+		assertNotNull(buffer2);
+
+		// the next buffer is null in view because FileBufferReader has no available buffers for reading ahead
+		assertFalse(subpartitionView.isAvailable());
+		// recycle a buffer to trigger notification of data available
+		buffer1.buffer().recycleBuffer();
+		assertTrue(listener.isAvailable);
+
+		// cleanup
+		buffer2.buffer().recycleBuffer();
+		subpartitionView.releaseAllResources();
+		subpartition.release();
+	}
+
+	private static ResultSubpartition createFileBoundedBlockingSubpartition() {
+		final ResultPartition resultPartition = new ResultPartitionBuilder()
+			.setNetworkBufferSize(BUFFER_SIZE)
+			.setResultPartitionType(ResultPartitionType.BLOCKING)
+			.setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE)
+			.setFileChannelManager(fileChannelManager)
+			.build();
+		return resultPartition.subpartitions[0];
+	}
+
+	private static void writeBuffers(BoundedData data, int numberOfBuffers) throws IOException {
+		for (int i = 0; i < numberOfBuffers; i++) {
+			data.writeBuffer(buildSomeBuffer(BUFFER_SIZE));
+		}
+		data.finishWrite();
+	}
+
+	private static void writeBuffers(ResultSubpartition subpartition, int numberOfBuffers) throws IOException {
+		for (int i = 0; i < numberOfBuffers; i++) {
+			subpartition.add(createFilledBufferConsumer(BUFFER_SIZE, BUFFER_SIZE));
+		}
+		subpartition.finish();
+	}
+
+	/**
+	 * This subpartition view is used for verifying the {@link ResultSubpartitionView#notifyDataAvailable()}
+	 * was ever called before.
+	 */
+	private static class VerifyNotificationResultSubpartitionView extends NoOpResultSubpartitionView {
+
+		private boolean isAvailable;
+
+		@Override
+		public void notifyDataAvailable() {
+			isAvailable = true;
+		}
+
+		private void resetAvailable() {
+			isAvailable = false;
+		}
+	}
+
+	/**
+	 * This listener is used for verifying the notification logic in {@link ResultSubpartitionView#notifyDataAvailable()}.
+	 */
+	private static class VerifyNotificationBufferAvailabilityListener implements BufferAvailabilityListener {
+
+		private boolean isAvailable;
+
+		@Override
+		public void notifyDataAvailable() {
+			isAvailable = true;
+		}
+
+		private void resetAvailable() {
+			isAvailable = false;
+		}
+	}
 }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
new file mode 100644
index 0000000..49d9ef5
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.runtime;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.RecordReader;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
+import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.testutils.serialization.types.ByteArrayType;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests the bug reported in FLINK-131O0.
+ *
+ * <p>The implementation of {@link org.apache.flink.runtime.io.network.partition.BoundedData.Reader#nextBuffer()}
+ * for {@link BoundedBlockingSubpartitionType#FILE} assumes that there is always an available buffer, otherwise
+ * an IOException is thrown and it always assumes that pool of two buffers is enough (before using the 3rd buffer,
+ * first one was expected to be recycled already). But in the case of pending flush operation (when the socket channel
+ * is not writable while netty thread is calling {@link ChannelHandlerContext#writeAndFlush(Object, ChannelPromise)}),
+ * the first fetched buffer from {@link org.apache.flink.runtime.io.network.partition.FileChannelBoundedData} has not
+ * been recycled while fetching the second buffer to trigger next read ahead, which breaks the above assumption.
+ */
+public class FileBufferReaderITCase extends TestLogger {
+
+	private static final int parallelism = 8;
+
+	private static final int numRecords = 100_000;
+
+	private static final byte[] dataSource = new byte[1024];
+
+	@BeforeClass
+	public static void setup() {
+		for (int i = 0; i < dataSource.length; i++) {
+			dataSource[i] = 0;
+		}
+	}
+
+	@Test
+	public void testSequentialReading() throws Exception {
+		// setup
+		final Configuration configuration = new Configuration();
+		configuration.setString(RestOptions.BIND_PORT, "0");
+		configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BOUNDED_BLOCKING_SUBPARTITION_TYPE, "file");
+
+		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(configuration)
+			.setNumTaskManagers(parallelism)
+			.setNumSlotsPerTaskManager(1)
+			.build();
+
+		try (final MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration)) {
+			miniCluster.start();
+
+			final MiniClusterClient client = new MiniClusterClient(configuration, miniCluster);
+			final JobGraph jobGraph = createJobGraph();
+			final CompletableFuture<JobSubmissionResult> submitFuture = client.submitJob(jobGraph);
+			// wait for the submission to succeed
+			final JobSubmissionResult result = submitFuture.get();
+
+			final CompletableFuture<JobResult> resultFuture = client.requestJobResult(result.getJobID());
+			final JobResult jobResult = resultFuture.get();
+
+			assertThat(jobResult.getSerializedThrowable().isPresent(), is(false));
+		}
+	}
+
+	private static JobGraph createJobGraph() {
+		final SlotSharingGroup group1 = new SlotSharingGroup();
+		final SlotSharingGroup group2 = new SlotSharingGroup();
+
+		final JobVertex source = new JobVertex("source");
+		source.setInvokableClass(TestSourceInvokable.class);
+		source.setParallelism(parallelism);
+		source.setSlotSharingGroup(group1);
+
+		final JobVertex sink = new JobVertex("sink");
+		sink.setInvokableClass(TestSinkInvokable.class);
+		sink.setParallelism(parallelism);
+		sink.setSlotSharingGroup(group2);
+
+		sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+		final JobGraph jobGraph = new JobGraph(source, sink);
+		jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
+
+		return jobGraph;
+	}
+
+	/**
+	 * Basic source {@link AbstractInvokable} which sends the elements to the
+	 * {@link TestSinkInvokable}.
+	 */
+	public static final class TestSourceInvokable extends AbstractInvokable {
+
+		/**
+		 * Create an Invokable task and set its environment.
+		 *
+		 * @param environment The environment assigned to this invokable.
+		 */
+		public TestSourceInvokable(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			final RecordWriter<ByteArrayType> writer = new RecordWriterBuilder().build(getEnvironment().getWriter(0));
+
+			final ByteArrayType bytes = new ByteArrayType(dataSource);
+			int counter = 0;
+			while (counter++ < numRecords) {
+				try {
+					writer.emit(bytes);
+					writer.flushAll();
+				} finally {
+					writer.clearBuffers();
+				}
+			}
+		}
+	}
+
+	/**
+	 * Basic sink {@link AbstractInvokable} which verifies the sent elements
+	 * from the {@link TestSourceInvokable}.
+	 */
+	public static final class TestSinkInvokable extends AbstractInvokable {
+
+		private int numReceived = 0;
+
+		/**
+		 * Create an Invokable task and set its environment.
+		 *
+		 * @param environment The environment assigned to this invokable.
+		 */
+		public TestSinkInvokable(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			final RecordReader<ByteArrayType> reader = new RecordReader<>(
+				getEnvironment().getInputGate(0),
+				ByteArrayType.class,
+				getEnvironment().getTaskManagerInfo().getTmpDirectories());
+
+			while (reader.hasNext()) {
+				reader.next();
+				numReceived++;
+			}
+
+			assertThat(numReceived, is(numRecords));
+		}
+	}
+}