You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2020/09/24 16:05:11 UTC

[flink] branch master updated (c3fab51 -> aa62e64)

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from c3fab51  [FLINK-19391][network] Moved notification during subpartition request to the requester.
     new 82f524b  [FLINK-19320][task] Remove RecordWriter#clearBuffers
     new 0c8a7c1  [hotfix] Remove outdated description in Javadoc of RecordWriter
     new 88a07a9  [FLINK-19312][network] Introduce BufferWritingResultPartition which wraps the ResultSubpartition related logic
     new 900a896  [FLINK-19297][network] Make ResultPartitionWriter record-oriented
     new b7ed49d  [hotfix] Remove unused RecordWriterTest#TrackingBufferRecycler
     new 8ec4f1d  [FLINK-19302][network] Fix flushing BoundedBlockingResultPartition
     new aa62e64  [FLINK-19323][network] Small optimization of RecordWriter#serializeRecord

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/core/memory/DataOutputSerializer.java    |  11 +
 .../api/serialization/RecordSerializer.java        |  94 -----
 .../serialization/SpanningRecordSerializer.java    | 114 ------
 .../network/api/writer/BroadcastRecordWriter.java  | 138 +------
 .../api/writer/ChannelSelectorRecordWriter.java    |  87 +----
 .../io/network/api/writer/RecordWriter.java        | 176 ++-------
 .../network/api/writer/ResultPartitionWriter.java  |  55 ++-
 .../io/network/metrics/ResultPartitionMetrics.java |  32 +-
 .../partition/BoundedBlockingResultPartition.java  |  12 +-
 .../partition/BufferWritingResultPartition.java    | 311 +++++++++++++++
 .../partition/PipelinedResultPartition.java        |  16 +-
 .../io/network/partition/ResultPartition.java      | 142 ++-----
 .../apache/flink/runtime/operators/BatchTask.java  |   2 +-
 .../operators/shipping/OutputCollector.java        |   5 +-
 ...bleNotifyingResultPartitionWriterDecorator.java |  64 ++-
 .../SpanningRecordSerializationTest.java           |  65 ++--
 .../SpanningRecordSerializerTest.java              | 196 ----------
 .../AbstractCollectingResultPartitionWriter.java   |  70 +---
 .../api/writer/BroadcastRecordWriterTest.java      |  77 ++--
 .../RecordCollectingResultPartitionWriter.java     |   4 +-
 ...cordOrEventCollectingResultPartitionWriter.java |  35 +-
 .../api/writer/RecordWriterDelegateTest.java       |  53 ++-
 .../io/network/api/writer/RecordWriterTest.java    | 430 ++++-----------------
 .../network/netty/PartitionRequestQueueTest.java   |  21 +-
 ...oundedBlockingSubpartitionAvailabilityTest.java |   2 +-
 .../partition/FileChannelBoundedDataTest.java      |   2 +-
 .../network/partition/InputGateFairnessTest.java   |  12 +-
 .../partition/MockResultPartitionWriter.java       |  34 +-
 .../PartialConsumePipelinedResultTest.java         |   7 +-
 .../io/network/partition/PartitionTestUtils.java   |  12 -
 .../partition/PipelinedSubpartitionTest.java       |  26 +-
 .../partition/ResultPartitionFactoryTest.java      |   4 +-
 .../io/network/partition/ResultPartitionTest.java  | 293 +++++++++-----
 .../IteratorWrappingTestSingleInputGate.java       |  13 +-
 .../partition/consumer/LocalInputChannelTest.java  |  31 +-
 .../partition/consumer/SingleInputGateTest.java    |   6 +-
 .../io/network/util/TestPartitionProducer.java     |  16 +-
 .../io/network/util/TestProducerSource.java        |  14 +-
 .../io/network/util/TestSubpartitionProducer.java  |  13 +-
 .../SlotCountExceedingParallelismTest.java         |   2 +-
 .../scheduler/ScheduleOrUpdateConsumersTest.java   |   2 +-
 .../jobmaster/TestingAbstractInvokables.java       |   2 +-
 .../operators/testutils/MockEnvironment.java       |   2 +-
 .../apache/flink/runtime/jobmanager/Tasks.scala    |   4 +-
 .../consumer/StreamTestSingleInputGate.java        |  11 +-
 .../runtime/io/StreamTaskNetworkInputTest.java     |  12 +-
 .../runtime/tasks/StreamMockEnvironment.java       |   2 -
 .../tasks/SubtaskCheckpointCoordinatorTest.java    |   4 +-
 .../flink/test/runtime/FileBufferReaderITCase.java |  17 +-
 .../test/runtime/NetworkStackThroughputITCase.java |   4 +-
 .../test/runtime/ShuffleCompressionITCase.java     |   2 +-
 .../PipelinedRegionSchedulingITCase.java           |   2 +-
 52 files changed, 1081 insertions(+), 1680 deletions(-)
 delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
 delete mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
 delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java


[flink] 07/07: [FLINK-19323][network] Small optimization of RecordWriter#serializeRecord

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aa62e64902a9dd3904a9049dcc9746682fb9f7fa
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Tue Sep 1 12:00:27 2020 +0800

    [FLINK-19323][network] Small optimization of RecordWriter#serializeRecord
    
    Currently, when serializing a record, the serializer will first skip 4 bytes for length filed and serialize the record. Then it gets the serialized record length and skips back to position 0 to write the length field. After that, it skip again to the tail of the serialized data. This patch avoids the last two skips by writing length field to position 0 directly.
---
 .../org/apache/flink/core/memory/DataOutputSerializer.java    | 11 +++++++++++
 .../flink/runtime/io/network/api/writer/RecordWriter.java     |  9 ++-------
 2 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
index bae5a32..6742be4 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputSerializer.java
@@ -209,6 +209,13 @@ public class DataOutputSerializer implements DataOutputView, MemorySegmentWritab
 		this.position += 4;
 	}
 
+	public void writeIntUnsafe(int v, int pos) throws IOException {
+		if (LITTLE_ENDIAN) {
+			v = Integer.reverseBytes(v);
+		}
+		UNSAFE.putInt(this.buffer, BASE_OFFSET + pos, v);
+	}
+
 	@SuppressWarnings("restriction")
 	@Override
 	public void writeLong(long v) throws IOException {
@@ -345,6 +352,10 @@ public class DataOutputSerializer implements DataOutputView, MemorySegmentWritab
 		this.position = position;
 	}
 
+	public void setPositionUnsafe(int position) {
+		this.position = position;
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index b58ef38..59204cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -120,19 +120,14 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 	public static ByteBuffer serializeRecord(
 			DataOutputSerializer serializer,
 			IOReadableWritable record) throws IOException {
-		serializer.clear();
-
 		// the initial capacity should be no less than 4 bytes
-		serializer.skipBytesToWrite(4);
+		serializer.setPositionUnsafe(4);
 
 		// write data
 		record.write(serializer);
 
 		// write length
-		int len = serializer.length() - 4;
-		serializer.setPosition(0);
-		serializer.writeInt(len);
-		serializer.skipBytesToWrite(len);
+		serializer.writeIntUnsafe(serializer.length() - 4, 0);
 
 		return serializer.wrapAsByteBuffer();
 	}


[flink] 05/07: [hotfix] Remove unused RecordWriterTest#TrackingBufferRecycler

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b7ed49d829ef031fb99834b377c2d598eedd3d1b
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Thu Sep 3 13:26:56 2020 +0800

    [hotfix] Remove unused RecordWriterTest#TrackingBufferRecycler
    
    This closes #13447
---
 .../runtime/io/network/api/writer/RecordWriterTest.java | 17 -----------------
 1 file changed, 17 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index a0956ec..e381544 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
@@ -39,7 +38,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
@@ -71,8 +69,6 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Random;
 
 import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
@@ -490,17 +486,4 @@ public class RecordWriterTest {
 			in.readFully(bytes);
 		}
 	}
-
-	private static class TrackingBufferRecycler implements BufferRecycler {
-		private final ArrayList<MemorySegment> recycledMemorySegments = new ArrayList<>();
-
-		@Override
-		public synchronized void recycle(MemorySegment memorySegment) {
-			recycledMemorySegments.add(memorySegment);
-		}
-
-		public synchronized List<MemorySegment> getRecycledMemorySegments() {
-			return recycledMemorySegments;
-		}
-	}
 }


[flink] 02/07: [hotfix] Remove outdated description in Javadoc of RecordWriter

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0c8a7c1ea3470f1ba647a9ea891b4e0204cc5ac6
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Thu Sep 3 16:29:30 2020 +0800

    [hotfix] Remove outdated description in Javadoc of RecordWriter
    
    This closes #13447
---
 .../org/apache/flink/runtime/io/network/api/writer/RecordWriter.java | 5 -----
 1 file changed, 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index d5d47c5..995e670 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -53,11 +53,6 @@ import static org.apache.flink.util.Preconditions.checkState;
  * <p>The RecordWriter wraps the runtime's {@link ResultPartitionWriter} and takes care of
  * serializing records into buffers.
  *
- * <p><strong>Important</strong>: it is necessary to call {@link #flushAll()} after
- * all records have been written with {@link #emit(IOReadableWritable)}. This
- * ensures that all produced records are written to the output stream (incl.
- * partially filled ones).
- *
  * @param <T> the type of the record that can be emitted with this record writer
  */
 public abstract class RecordWriter<T extends IOReadableWritable> implements AvailabilityProvider {


[flink] 03/07: [FLINK-19312][network] Introduce BufferWritingResultPartition which wraps the ResultSubpartition related logic

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 88a07a9eb65514cf1874a27afc72bd430d87d11f
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Mon Aug 31 15:30:20 2020 +0800

    [FLINK-19312][network] Introduce BufferWritingResultPartition which wraps the ResultSubpartition related logic
    
    In the current abstraction, buffers are written to and read from ResultSubpartitions, which is a hash-style data writing and reading implementation. This is in contrast to implementations where records are appended to a joint structure, from which the data is drawn after the write phase is finished, for example the sort-based partitioning which clusters data belonging to different channels by sorting channel index. In the future, sort-merge based ResultPartitionWriter will be implemen [...]
---
 .../io/network/metrics/ResultPartitionMetrics.java |  32 ++--
 .../partition/BoundedBlockingResultPartition.java  |   2 +-
 .../partition/BufferWritingResultPartition.java    | 176 +++++++++++++++++++++
 .../partition/PipelinedResultPartition.java        |   6 +-
 .../io/network/partition/ResultPartition.java      | 121 +++-----------
 ...oundedBlockingSubpartitionAvailabilityTest.java |   2 +-
 .../partition/FileChannelBoundedDataTest.java      |   2 +-
 .../network/partition/InputGateFairnessTest.java   |  12 +-
 .../partition/ResultPartitionFactoryTest.java      |   4 +-
 .../io/network/partition/ResultPartitionTest.java  |   2 +-
 .../partition/consumer/LocalInputChannelTest.java  |   5 +-
 11 files changed, 227 insertions(+), 137 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java
index 2171ff3..e3ba640 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.metrics;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -49,13 +48,7 @@ public class ResultPartitionMetrics {
 	 * @return total number of queued buffers
 	 */
 	long refreshAndGetTotal() {
-		long total = 0;
-
-		for (ResultSubpartition part : partition.getAllPartitions()) {
-			total += part.unsynchronizedGetNumberOfQueuedBuffers();
-		}
-
-		return total;
+		return partition.getNumberOfQueuedBuffers();
 	}
 
 	/**
@@ -66,15 +59,15 @@ public class ResultPartitionMetrics {
 	 */
 	int refreshAndGetMin() {
 		int min = Integer.MAX_VALUE;
+		int numSubpartitions = partition.getNumberOfSubpartitions();
 
-		ResultSubpartition[] allPartitions = partition.getAllPartitions();
-		if (allPartitions.length == 0) {
+		if (numSubpartitions == 0) {
 			// meaningful value when no channels exist:
 			return 0;
 		}
 
-		for (ResultSubpartition part : allPartitions) {
-			int size = part.unsynchronizedGetNumberOfQueuedBuffers();
+		for (int targetSubpartition = 0; targetSubpartition < numSubpartitions; ++targetSubpartition) {
+			int size = partition.getNumberOfQueuedBuffers(targetSubpartition);
 			min = Math.min(min, size);
 		}
 
@@ -89,9 +82,10 @@ public class ResultPartitionMetrics {
 	 */
 	int refreshAndGetMax() {
 		int max = 0;
+		int numSubpartitions = partition.getNumberOfSubpartitions();
 
-		for (ResultSubpartition part : partition.getAllPartitions()) {
-			int size = part.unsynchronizedGetNumberOfQueuedBuffers();
+		for (int targetSubpartition = 0; targetSubpartition < numSubpartitions; ++targetSubpartition) {
+			int size = partition.getNumberOfQueuedBuffers(targetSubpartition);
 			max = Math.max(max, size);
 		}
 
@@ -105,15 +99,7 @@ public class ResultPartitionMetrics {
 	 * @return average number of queued buffers per sub-partition
 	 */
 	float refreshAndGetAvg() {
-		long total = 0;
-
-		ResultSubpartition[] allPartitions = partition.getAllPartitions();
-		for (ResultSubpartition part : allPartitions) {
-			int size = part.unsynchronizedGetNumberOfQueuedBuffers();
-			total += size;
-		}
-
-		return total / (float) allPartitions.length;
+		return partition.getNumberOfQueuedBuffers() / (float) partition.getNumberOfSubpartitions();
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
index fc440a0..a16cfe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
@@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * per sub-partition. This implementation hence requires at least as many files (file handles) and
  * memory buffers as the parallelism of the target task that the data is shuffled to.
  */
-public class BoundedBlockingResultPartition extends ResultPartition {
+public class BoundedBlockingResultPartition extends BufferWritingResultPartition {
 
 	public BoundedBlockingResultPartition(
 			String owningTaskName,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
new file mode 100644
index 0000000..437d0a0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
+import org.apache.flink.util.function.FunctionWithException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link ResultPartition} which writes buffers directly to {@link ResultSubpartition}s. This
+ * is in contrast to implementations where records are written to a joint structure, from which
+ * the subpartitions draw the data after the write phase is finished, for example the sort-based
+ * partitioning.
+ *
+ * <p>To avoid confusion: On the read side, all subpartitions return buffers (and backlog) to be
+ * transported through the network.
+ */
+public class BufferWritingResultPartition extends ResultPartition {
+
+	/** The subpartitions of this partition. At least one. */
+	protected final ResultSubpartition[] subpartitions;
+
+	public BufferWritingResultPartition(
+		String owningTaskName,
+		int partitionIndex,
+		ResultPartitionID partitionId,
+		ResultPartitionType partitionType,
+		ResultSubpartition[] subpartitions,
+		int numTargetKeyGroups,
+		ResultPartitionManager partitionManager,
+		@Nullable BufferCompressor bufferCompressor,
+		FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
+
+		super(
+			owningTaskName,
+			partitionIndex,
+			partitionId,
+			partitionType,
+			subpartitions.length,
+			numTargetKeyGroups,
+			partitionManager,
+			bufferCompressor,
+			bufferPoolFactory);
+
+		this.subpartitions = checkNotNull(subpartitions);
+	}
+
+	public int getNumberOfQueuedBuffers() {
+		int totalBuffers = 0;
+
+		for (ResultSubpartition subpartition : subpartitions) {
+			totalBuffers += subpartition.unsynchronizedGetNumberOfQueuedBuffers();
+		}
+
+		return totalBuffers;
+	}
+
+	public int getNumberOfQueuedBuffers(int targetSubpartition) {
+		checkArgument(targetSubpartition >= 0 && targetSubpartition < numSubpartitions);
+		return subpartitions[targetSubpartition].unsynchronizedGetNumberOfQueuedBuffers();
+	}
+
+	@Override
+	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
+		checkInProduceState();
+
+		return bufferPool.requestBufferBuilderBlocking(targetChannel);
+	}
+
+	@Override
+	public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
+		return bufferPool.requestBufferBuilder(targetChannel);
+	}
+
+	@Override
+	public boolean addBufferConsumer(
+			BufferConsumer bufferConsumer,
+			int subpartitionIndex) throws IOException {
+		checkNotNull(bufferConsumer);
+
+		ResultSubpartition subpartition;
+		try {
+			checkInProduceState();
+			subpartition = subpartitions[subpartitionIndex];
+		}
+		catch (Exception ex) {
+			bufferConsumer.close();
+			throw ex;
+		}
+
+		return subpartition.add(bufferConsumer);
+	}
+
+	@Override
+	public void flushAll() {
+		for (ResultSubpartition subpartition : subpartitions) {
+			subpartition.flush();
+		}
+	}
+
+	@Override
+	public void flush(int targetSubpartition) {
+		subpartitions[targetSubpartition].flush();
+	}
+
+	@Override
+	public ResultSubpartitionView createSubpartitionView(
+			int subpartitionIndex,
+			BufferAvailabilityListener availabilityListener) throws IOException {
+		checkElementIndex(subpartitionIndex, numSubpartitions, "Subpartition not found.");
+		checkState(!isReleased(), "Partition released.");
+
+		ResultSubpartition subpartition = subpartitions[subpartitionIndex];
+		ResultSubpartitionView readView = subpartition.createReadView(availabilityListener);
+
+		LOG.debug("Created {}", readView);
+
+		return readView;
+	}
+
+	@Override
+	public void finish() throws IOException {
+		for (ResultSubpartition subpartition : subpartitions) {
+			subpartition.finish();
+		}
+		super.finish();
+	}
+
+	@Override
+	protected void releaseInternal() {
+		// Release all subpartitions
+		for (ResultSubpartition subpartition : subpartitions) {
+			try {
+				subpartition.release();
+			}
+			// Catch this in order to ensure that release is called on all subpartitions
+			catch (Throwable t) {
+				LOG.error("Error during release of result subpartition: " + t.getMessage(), t);
+			}
+		}
+	}
+
+	@VisibleForTesting
+	public ResultSubpartition[] getAllPartitions() {
+		return subpartitions;
+	}
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
index dc9da76..a1888c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
@@ -47,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  * {@link #onConsumedSubpartition(int)}) then the partition as a whole is disposed and all buffers are
  * freed.
  */
-public class PipelinedResultPartition extends ResultPartition
+public class PipelinedResultPartition extends BufferWritingResultPartition
 		implements CheckpointedResultPartition, ChannelStateHolder {
 
 	/** The lock that guard release operations (which can be asynchronously propagated from the
@@ -134,12 +134,12 @@ public class PipelinedResultPartition extends ResultPartition
 
 	@Override
 	public CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartitionIndex) {
-		return (CheckpointedResultSubpartition) getAllPartitions()[subpartitionIndex];
+		return (CheckpointedResultSubpartition) subpartitions[subpartitionIndex];
 	}
 
 	@Override
 	public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
-		for (ResultSubpartition subPar : getAllPartitions()) {
+		for (ResultSubpartition subPar : subpartitions) {
 			((PipelinedSubpartition) subPar).readRecoveredState(stateReader);
 		}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 15430b9..02c91c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -21,9 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
@@ -43,7 +41,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkElementIndex;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -52,8 +49,8 @@ import static org.apache.flink.util.Preconditions.checkState;
  *
  * <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
  * a result partition is a collection of {@link Buffer} instances. The buffers are organized in one
- * or more {@link ResultSubpartition} instances, which further partition the data depending on the
- * number of consuming tasks and the data {@link DistributionPattern}.
+ * or more {@link ResultSubpartition} instances or in a joint structure which further partition the
+ * data depending on the number of consuming tasks and the data {@link DistributionPattern}.
  *
  * <p>Tasks, which consume a result partition have to request one of its subpartitions. The request
  * happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel})
@@ -84,18 +81,17 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 	/** Type of this partition. Defines the concrete subpartition implementation to use. */
 	protected final ResultPartitionType partitionType;
 
-	/** The subpartitions of this partition. At least one. */
-	protected final ResultSubpartition[] subpartitions;
-
 	protected final ResultPartitionManager partitionManager;
 
+	protected final int numSubpartitions;
+
 	private final int numTargetKeyGroups;
 
 	// - Runtime state --------------------------------------------------------
 
 	private final AtomicBoolean isReleased = new AtomicBoolean();
 
-	private BufferPool bufferPool;
+	protected BufferPool bufferPool;
 
 	private boolean isFinished;
 
@@ -112,7 +108,7 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 		int partitionIndex,
 		ResultPartitionID partitionId,
 		ResultPartitionType partitionType,
-		ResultSubpartition[] subpartitions,
+		int numSubpartitions,
 		int numTargetKeyGroups,
 		ResultPartitionManager partitionManager,
 		@Nullable BufferCompressor bufferCompressor,
@@ -123,7 +119,7 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 		this.partitionIndex = partitionIndex;
 		this.partitionId = checkNotNull(partitionId);
 		this.partitionType = checkNotNull(partitionType);
-		this.subpartitions = checkNotNull(subpartitions);
+		this.numSubpartitions = numSubpartitions;
 		this.numTargetKeyGroups = numTargetKeyGroups;
 		this.partitionManager = checkNotNull(partitionManager);
 		this.bufferCompressor = bufferCompressor;
@@ -164,22 +160,22 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 
 	@Override
 	public int getNumberOfSubpartitions() {
-		return subpartitions.length;
+		return numSubpartitions;
 	}
 
 	public BufferPool getBufferPool() {
 		return bufferPool;
 	}
 
-	public int getNumberOfQueuedBuffers() {
-		int totalBuffers = 0;
-
-		for (ResultSubpartition subpartition : subpartitions) {
-			totalBuffers += subpartition.unsynchronizedGetNumberOfQueuedBuffers();
-		}
+	/**
+	 * Returns the total number of queued buffers of all subpartitions.
+	 */
+	public abstract int getNumberOfQueuedBuffers();
 
-		return totalBuffers;
-	}
+	/**
+	 * Returns the number of queued buffers of the given target subpartition.
+	 */
+	public abstract int getNumberOfQueuedBuffers(int targetSubpartition);
 
 	/**
 	 * Returns the type of this result partition.
@@ -192,48 +188,6 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 
 	// ------------------------------------------------------------------------
 
-	@Override
-	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		checkInProduceState();
-
-		return bufferPool.requestBufferBuilderBlocking(targetChannel);
-	}
-
-	@Override
-	public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-		BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetChannel);
-		return bufferBuilder;
-	}
-
-	@Override
-	public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
-		checkNotNull(bufferConsumer);
-
-		ResultSubpartition subpartition;
-		try {
-			checkInProduceState();
-			subpartition = subpartitions[subpartitionIndex];
-		}
-		catch (Exception ex) {
-			bufferConsumer.close();
-			throw ex;
-		}
-
-		return subpartition.add(bufferConsumer);
-	}
-
-	@Override
-	public void flushAll() {
-		for (ResultSubpartition subpartition : subpartitions) {
-			subpartition.flush();
-		}
-	}
-
-	@Override
-	public void flush(int subpartitionIndex) {
-		subpartitions[subpartitionIndex].flush();
-	}
-
 	/**
 	 * Finishes the result partition.
 	 *
@@ -245,10 +199,6 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 	public void finish() throws IOException {
 		checkInProduceState();
 
-		for (ResultSubpartition subpartition : subpartitions) {
-			subpartition.finish();
-		}
-
 		isFinished = true;
 	}
 
@@ -268,19 +218,15 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 				this.cause = cause;
 			}
 
-			// Release all subpartitions
-			for (ResultSubpartition subpartition : subpartitions) {
-				try {
-					subpartition.release();
-				}
-				// Catch this in order to ensure that release is called on all subpartitions
-				catch (Throwable t) {
-					LOG.error("Error during release of result subpartition: " + t.getMessage(), t);
-				}
-			}
+			releaseInternal();
 		}
 	}
 
+	/**
+	 * Releases all produced data including both those stored in memory and persisted on disk.
+	 */
+	protected abstract void releaseInternal();
+
 	@Override
 	public void close() {
 		if (bufferPool != null) {
@@ -293,21 +239,6 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 		partitionManager.releasePartition(partitionId, throwable);
 	}
 
-	/**
-	 * Returns the requested subpartition.
-	 */
-	@Override
-	public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
-		checkElementIndex(index, subpartitions.length, "Subpartition not found.");
-		checkState(!isReleased.get(), "Partition released.");
-
-		ResultSubpartitionView readView = subpartitions[index].createReadView(availabilityListener);
-
-		LOG.debug("Created {}", readView);
-
-		return readView;
-	}
-
 	public Throwable getFailureCause() {
 		return cause;
 	}
@@ -346,7 +277,7 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 	@Override
 	public String toString() {
 		return "ResultPartition " + partitionId.toString() + " [" + partitionType + ", "
-				+ subpartitions.length + " subpartitions]";
+				+ numSubpartitions + " subpartitions]";
 	}
 
 	// ------------------------------------------------------------------------
@@ -364,13 +295,9 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 				this, subpartitionIndex);
 	}
 
-	public ResultSubpartition[] getAllPartitions() {
-		return subpartitions;
-	}
-
 	// ------------------------------------------------------------------------
 
-	private void checkInProduceState() throws IllegalStateException {
+	protected void checkInProduceState() throws IllegalStateException {
 		checkState(!isFinished, "Partition already finished.");
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
index 8a81dc8..fd1188c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
@@ -122,7 +122,7 @@ public class BoundedBlockingSubpartitionAvailabilityTest {
 	// ------------------------------------------------------------------------
 
 	private static ResultSubpartition createPartitionWithData(int numberOfBuffers) throws IOException {
-		ResultPartition parent = new ResultPartitionBuilder()
+		BoundedBlockingResultPartition parent = (BoundedBlockingResultPartition) new ResultPartitionBuilder()
 			.setResultPartitionType(ResultPartitionType.BLOCKING_PERSISTENT)
 			.setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE)
 			.setFileChannelManager(new FileChannelManagerImpl(new String[] { TMP_FOLDER.newFolder().toString() }, "data"))
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
index aa209ec..559d13d 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
@@ -152,7 +152,7 @@ public class FileChannelBoundedDataTest extends BoundedDataTestBase {
 	}
 
 	private static ResultSubpartition createFileBoundedBlockingSubpartition() {
-		final ResultPartition resultPartition = new ResultPartitionBuilder()
+		final BoundedBlockingResultPartition resultPartition = (BoundedBlockingResultPartition) new ResultPartitionBuilder()
 			.setNetworkBufferSize(BUFFER_SIZE)
 			.setResultPartitionType(ResultPartitionType.BLOCKING)
 			.setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index e770a65..b6fadf8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -64,9 +64,9 @@ public class InputGateFairnessTest {
 		final int numberOfChannels = 37;
 		final int buffersPerChannel = 27;
 
-		ResultPartition[] resultPartitions = IntStream.range(0, numberOfChannels)
-			.mapToObj(i -> new ResultPartitionBuilder().build())
-			.toArray(ResultPartition[]::new);
+		PipelinedResultPartition[] resultPartitions = IntStream.range(0, numberOfChannels)
+			.mapToObj(i -> (PipelinedResultPartition) new ResultPartitionBuilder().build())
+			.toArray(PipelinedResultPartition[]::new);
 		final BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(42);
 
 		// ----- create some source channels and fill them with buffers -----
@@ -124,9 +124,9 @@ public class InputGateFairnessTest {
 		final int numberOfChannels = 37;
 		final int buffersPerChannel = 27;
 
-		ResultPartition[] resultPartitions = IntStream.range(0, numberOfChannels)
-			.mapToObj(i -> new ResultPartitionBuilder().build())
-			.toArray(ResultPartition[]::new);
+		PipelinedResultPartition[] resultPartitions = IntStream.range(0, numberOfChannels)
+			.mapToObj(i -> (PipelinedResultPartition) new ResultPartitionBuilder().build())
+			.toArray(PipelinedResultPartition[]::new);
 		try (BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(42)) {
 
 			// ----- create some source channels and fill them with one buffer each -----
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 61af900..903878b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -61,13 +61,13 @@ public class ResultPartitionFactoryTest extends TestLogger {
 
 	@Test
 	public void testBoundedBlockingSubpartitionsCreated() {
-		final ResultPartition resultPartition = createResultPartition(ResultPartitionType.BLOCKING);
+		final BoundedBlockingResultPartition resultPartition = (BoundedBlockingResultPartition) createResultPartition(ResultPartitionType.BLOCKING);
 		Arrays.stream(resultPartition.subpartitions).forEach(sp -> assertThat(sp, instanceOf(BoundedBlockingSubpartition.class)));
 	}
 
 	@Test
 	public void testPipelinedSubpartitionsCreated() {
-		final ResultPartition resultPartition = createResultPartition(ResultPartitionType.PIPELINED);
+		final PipelinedResultPartition resultPartition = (PipelinedResultPartition) createResultPartition(ResultPartitionType.PIPELINED);
 		Arrays.stream(resultPartition.subpartitions).forEach(sp -> assertThat(sp, instanceOf(PipelinedSubpartition.class)));
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 7f419ba..e293422 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -95,7 +95,7 @@ public class ResultPartitionTest {
 		final int numSubpartitions = 3;
 
 		for (int i = 0; i < numPartitions; i++) {
-			final ResultPartition partition = new ResultPartitionBuilder()
+			final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder()
 				.setResultPartitionIndex(i)
 				.setNumberOfSubpartitions(numSubpartitions)
 				.build();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index d0bb400..6e65ad74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
+import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -461,7 +462,7 @@ public class LocalInputChannelTest {
 	public void testCheckpointingInflightData() throws Exception {
 		SingleInputGate inputGate = new SingleInputGateBuilder().build();
 
-		ResultPartition parent = PartitionTestUtils.createPartition(
+		PipelinedResultPartition parent = (PipelinedResultPartition) PartitionTestUtils.createPartition(
 			ResultPartitionType.PIPELINED,
 			NoOpFileChannelManager.INSTANCE);
 		ResultSubpartition subpartition = parent.getAllPartitions()[0];
@@ -501,7 +502,7 @@ public class LocalInputChannelTest {
 
 	private static ResultSubpartitionView createResultSubpartitionView(boolean addBuffer) throws IOException {
 		int bufferSize = 4096;
-		ResultPartition parent = PartitionTestUtils.createPartition(
+		PipelinedResultPartition parent = (PipelinedResultPartition) PartitionTestUtils.createPartition(
 			ResultPartitionType.PIPELINED,
 			NoOpFileChannelManager.INSTANCE,
 			true,


[flink] 06/07: [FLINK-19302][network] Fix flushing BoundedBlockingResultPartition

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8ec4f1d8d2c3b2f1273d529cd67513c2f68b3656
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Thu Sep 3 13:35:10 2020 +0800

    [FLINK-19302][network] Fix flushing BoundedBlockingResultPartition
    
    Currently, when flushing the BoundedBlockingSubpartition, the unfinished BufferConsumer will be closed and recycled, however the corresponding BufferBuilder is not finished and the writer can keep coping records to it which can lead to loss of data. This patch fix the issue by finishing the corresponding BufferBuilders first when flushing a BoundedBlockingResultPartition.
---
 .../partition/BoundedBlockingResultPartition.java  | 10 ++++++
 .../partition/BufferWritingResultPartition.java    | 24 ++++++++-----
 .../partition/PipelinedResultPartition.java        | 10 ++++++
 .../io/network/partition/ResultPartitionTest.java  | 41 +++++++++++++++++++++-
 4 files changed, 76 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
index a16cfe2..b98b568 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
@@ -63,6 +63,16 @@ public class BoundedBlockingResultPartition extends BufferWritingResultPartition
 			bufferPoolFactory);
 	}
 
+	@Override
+	public void flush(int targetSubpartition) {
+		flushSubpartition(targetSubpartition, true);
+	}
+
+	@Override
+	public void flushAll() {
+		flushAllSubpartitions(true);
+	}
+
 	private static ResultPartitionType checkResultPartitionType(ResultPartitionType type) {
 		checkArgument(type == ResultPartitionType.BLOCKING || type == ResultPartitionType.BLOCKING_PERSISTENT);
 		return type;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index bd9e8ad..dac509e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -51,7 +51,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * <p>To avoid confusion: On the read side, all subpartitions return buffers (and backlog) to be
  * transported through the network.
  */
-public class BufferWritingResultPartition extends ResultPartition {
+public abstract class BufferWritingResultPartition extends ResultPartition {
 
 	/** The subpartitions of this partition. At least one. */
 	protected final ResultSubpartition[] subpartitions;
@@ -105,18 +105,26 @@ public class BufferWritingResultPartition extends ResultPartition {
 		return subpartitions[targetSubpartition].unsynchronizedGetNumberOfQueuedBuffers();
 	}
 
-	@Override
-	public void flushAll() {
-		for (ResultSubpartition subpartition : subpartitions) {
-			subpartition.flush();
+	protected void flushSubpartition(int targetSubpartition, boolean finishProducers) {
+		if (finishProducers) {
+			finishBroadcastBufferBuilder();
+			finishSubpartitionBufferBuilder(targetSubpartition);
 		}
-	}
 
-	@Override
-	public void flush(int targetSubpartition) {
 		subpartitions[targetSubpartition].flush();
 	}
 
+	protected void flushAllSubpartitions(boolean finishProducers) {
+		if (finishProducers) {
+			finishBroadcastBufferBuilder();
+			finishSubpartitionBufferBuilders();
+		}
+
+		for (ResultSubpartition subpartition : subpartitions) {
+			subpartition.flush();
+		}
+	}
+
 	public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
 		do {
 			final BufferBuilder bufferBuilder = getSubpartitionBufferBuilder(targetSubpartition);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
index a1888c1..236dde0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
@@ -147,6 +147,16 @@ public class PipelinedResultPartition extends BufferWritingResultPartition
 	}
 
 	@Override
+	public void flushAll() {
+		flushAllSubpartitions(false);
+	}
+
+	@Override
+	public void flush(int targetSubpartition) {
+		flushSubpartition(targetSubpartition, false);
+	}
+
+	@Override
 	@SuppressWarnings("FieldAccessNotGuarded")
 	public String toString() {
 		return "PipelinedResultPartition " + partitionId.toString() + " [" + partitionType + ", "
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index d221e99..b261b18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -449,7 +449,7 @@ public class ResultPartitionTest {
 		NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
 			.setNumNetworkBuffers(10)
 			.setBufferSize(bufferSize).build();
-		ResultPartition resultPartition = createPartition(network, fileChannelManager, partitionType, 1);
+		ResultPartition resultPartition = createPartition(network, fileChannelManager, partitionType, 2);
 		resultPartition.setup();
 		return (BufferWritingResultPartition) resultPartition;
 	}
@@ -611,6 +611,45 @@ public class ResultPartitionTest {
 		assertNotNull(readView.getNextBuffer().buffer());
 	}
 
+	@Test
+	public void testFlushBoundedBlockingResultPartition() throws IOException {
+		int value = 1024;
+		ResultPartition partition = createResultPartition(ResultPartitionType.BLOCKING);
+
+		ByteBuffer record = ByteBuffer.allocate(4);
+		record.putInt(value);
+
+		record.rewind();
+		partition.emitRecord(record, 0);
+		partition.flush(0);
+
+		record.rewind();
+		partition.emitRecord(record, 0);
+
+		record.rewind();
+		partition.broadcastRecord(record);
+		partition.flushAll();
+
+		record.rewind();
+		partition.broadcastRecord(record);
+		partition.finish();
+		record.rewind();
+
+		ResultSubpartitionView readView1 = partition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
+		for (int i = 0; i < 4; ++i) {
+			assertEquals(record, readView1.getNextBuffer().buffer().getNioBufferReadable());
+		}
+		assertFalse(readView1.getNextBuffer().buffer().isBuffer());
+		assertNull(readView1.getNextBuffer());
+
+		ResultSubpartitionView readView2 = partition.createSubpartitionView(1, new NoOpBufferAvailablityListener());
+		for (int i = 0; i < 2; ++i) {
+			assertEquals(record, readView2.getNextBuffer().buffer().getNioBufferReadable());
+		}
+		assertFalse(readView2.getNextBuffer().buffer().isBuffer());
+		assertNull(readView2.getNextBuffer());
+	}
+
 	/**
 	 * The {@link ChannelStateReader} instance for restoring the specific number of states.
 	 */


[flink] 01/07: [FLINK-19320][task] Remove RecordWriter#clearBuffers

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 82f524baec6b7b5ce9ce4a4940ece71b6e2da1e2
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Tue Sep 1 10:56:53 2020 +0800

    [FLINK-19320][task] Remove RecordWriter#clearBuffers
    
    Previously, RecordWriter#clearBuffers was used to recycle the partially filled buffer in the serializer. However, currently the serializer does not contain any network buffer any more. The method now is used to finish the current BufferBuilders and only some tests and BatchTask use it. Actually, these usage should be replaced by RecordWriter#close which dose the same thing. So this patch removes RecordWriter#clearBuffers and the corresponding test cases.
---
 .../network/api/writer/BroadcastRecordWriter.java  | 11 +--
 .../api/writer/ChannelSelectorRecordWriter.java    | 17 ++---
 .../io/network/api/writer/RecordWriter.java        | 11 +--
 .../apache/flink/runtime/operators/BatchTask.java  |  2 +-
 .../operators/shipping/OutputCollector.java        |  2 +-
 .../io/network/api/writer/RecordWriterTest.java    | 86 ----------------------
 .../SlotCountExceedingParallelismTest.java         |  2 +-
 .../scheduler/ScheduleOrUpdateConsumersTest.java   |  2 +-
 .../jobmaster/TestingAbstractInvokables.java       |  2 +-
 .../apache/flink/runtime/jobmanager/Tasks.scala    |  4 +-
 .../flink/test/runtime/FileBufferReaderITCase.java | 17 +++--
 .../test/runtime/NetworkStackThroughputITCase.java |  4 +-
 .../test/runtime/ShuffleCompressionITCase.java     |  2 +-
 .../PipelinedRegionSchedulingITCase.java           |  2 +-
 14 files changed, 31 insertions(+), 133 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
index 132fefa..b834738 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
@@ -163,16 +163,7 @@ public final class BroadcastRecordWriter<T extends IOReadableWritable> extends R
 	}
 
 	@Override
-	public void closeBufferBuilder(int targetChannel) {
-		closeBufferBuilder();
-	}
-
-	@Override
-	public void clearBuffers() {
-		closeBufferBuilder();
-	}
-
-	private void closeBufferBuilder() {
+	public void closeBufferBuilders() {
 		if (bufferBuilder != null) {
 			bufferBuilder.finish();
 			bufferBuilder = null;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
index 2e14988..5e32056 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
@@ -116,17 +116,12 @@ public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> ext
 	}
 
 	@Override
-	public void closeBufferBuilder(int targetChannel) {
-		if (bufferBuilders[targetChannel] != null) {
-			bufferBuilders[targetChannel].finish();
-			bufferBuilders[targetChannel] = null;
-		}
-	}
-
-	@Override
-	public void clearBuffers() {
-		for (int index = 0; index < numberOfChannels; index++) {
-			closeBufferBuilder(index);
+	public void closeBufferBuilders() {
+		for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) {
+			if (bufferBuilders[targetChannel] != null) {
+				bufferBuilders[targetChannel].finish();
+				bufferBuilders[targetChannel] = null;
+			}
 		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 2c34993..d5d47c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -236,20 +236,15 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 	abstract void emptyCurrentBufferBuilder(int targetChannel);
 
 	/**
-	 * Marks the current {@link BufferBuilder} as finished and releases the resources for the target channel.
+	 * Marks the current {@link BufferBuilder}s as finished and releases the resources.
 	 */
-	abstract void closeBufferBuilder(int targetChannel);
-
-	/**
-	 * Closes the {@link BufferBuilder}s for all the channels.
-	 */
-	public abstract void clearBuffers();
+	abstract void closeBufferBuilders();
 
 	/**
 	 * Closes the writer. This stops the flushing thread (if there is one).
 	 */
 	public void close() {
-		clearBuffers();
+		closeBufferBuilders();
 		// make sure we terminate the thread in any case
 		if (outputFlusher != null) {
 			outputFlusher.terminate();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index b336fcf..f43359c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -1484,7 +1484,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 
 	public static void clearWriters(List<RecordWriter<?>> writers) {
 		for (RecordWriter<?> writer : writers) {
-			writer.clearBuffers();
+			writer.close();
 		}
 	}
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
index 7fc8942..d8cd9ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
@@ -81,7 +81,7 @@ public class OutputCollector<T> implements Collector<T> {
 	@Override
 	public void close() {
 		for (RecordWriter<?> writer : writers) {
-			writer.clearBuffers();
+			writer.close();
 			writer.flushAll();
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index bf939c4..6cad1d3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -79,11 +79,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Random;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
@@ -92,7 +88,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.spy;
 
 /**
  * Tests for the {@link RecordWriter}.
@@ -117,87 +112,6 @@ public class RecordWriterTest {
 	// ---------------------------------------------------------------------------------------------
 
 	/**
-	 * Tests a fix for FLINK-2089.
-	 *
-	 * @see <a href="https://issues.apache.org/jira/browse/FLINK-2089">FLINK-2089</a>
-	 */
-	@Test
-	public void testClearBuffersAfterInterruptDuringBlockingBufferRequest() throws Exception {
-		ExecutorService executor = null;
-
-		try {
-			executor = Executors.newSingleThreadExecutor();
-
-			TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(1);
-
-			KeepingPartitionWriter partitionWriter = new KeepingPartitionWriter(bufferProvider);
-
-			final RecordWriter<IntValue> recordWriter = createRecordWriter(partitionWriter);
-
-			CountDownLatch waitLock = new CountDownLatch(1);
-			Future<?> result = executor.submit(new Callable<Void>() {
-				@Override
-				public Void call() throws Exception {
-					IntValue val = new IntValue(0);
-
-					try {
-						recordWriter.emit(val);
-						recordWriter.flushAll();
-						waitLock.countDown();
-						recordWriter.emit(val);
-					}
-					catch (InterruptedException e) {
-						recordWriter.clearBuffers();
-					}
-
-					return null;
-				}
-			});
-
-			waitLock.await();
-
-			// Interrupt the Thread.
-			//
-			// The second emit call requests a new buffer and blocks the thread.
-			// When interrupting the thread at this point, clearing the buffers
-			// should not recycle any buffer.
-			result.cancel(true);
-
-			recordWriter.clearBuffers();
-
-			// Verify that the written out buffer has only been recycled once
-			// (by the partition writer), so no buffer recycled.
-			assertEquals(0, bufferProvider.getNumberOfAvailableBuffers());
-
-			partitionWriter.close();
-			assertEquals(1, bufferProvider.getNumberOfAvailableBuffers());
-		}
-		finally {
-			if (executor != null) {
-				executor.shutdown();
-			}
-		}
-	}
-
-	@Test
-	public void testSerializerClearedAfterClearBuffers() throws Exception {
-		ResultPartitionWriter partitionWriter =
-			spy(new RecyclingPartitionWriter(new TestPooledBufferProvider(1, 16)));
-
-		RecordWriter<IntValue> recordWriter = createRecordWriter(partitionWriter);
-
-		// Fill a buffer, but don't write it out.
-		recordWriter.emit(new IntValue(0));
-
-		// Clear all buffers.
-		recordWriter.clearBuffers();
-
-		// This should not throw an Exception iff the serializer state
-		// has been cleared as expected.
-		recordWriter.flushAll();
-	}
-
-	/**
 	 * Tests broadcasting events when no records have been emitted yet.
 	 */
 	@Test
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
index 138d5fd..3f80ea1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java
@@ -147,7 +147,7 @@ public class SlotCountExceedingParallelismTest extends TestLogger {
 				writer.flushAll();
 			}
 			finally {
-				writer.clearBuffers();
+				writer.close();
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
index f267e5f..ece1c80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleOrUpdateConsumersTest.java
@@ -166,7 +166,7 @@ public class ScheduleOrUpdateConsumersTest extends TestLogger {
 					writer.flushAll();
 				}
 				finally {
-					writer.clearBuffers();
+					writer.close();
 				}
 			}
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
index 23e5dce..788e26c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingAbstractInvokables.java
@@ -54,7 +54,7 @@ public class TestingAbstractInvokables {
 				writer.emit(new IntValue(1337));
 				writer.flushAll();
 			} finally {
-				writer.clearBuffers();
+				writer.close();
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index 90ee282..84f166b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.jobmanager
 
 import org.apache.flink.runtime.execution.Environment
 import org.apache.flink.runtime.io.network.api.reader.RecordReader
-import org.apache.flink.runtime.io.network.api.writer.{RecordWriter, RecordWriterBuilder}
+import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
 import org.apache.flink.types.IntValue
 
@@ -51,7 +51,7 @@ object Tasks {
 
         writer.flushAll()
       } finally {
-        writer.clearBuffers()
+        writer.close()
       }
     }
   }
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
index f6d6ad9..eae3e0e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/FileBufferReaderITCase.java
@@ -71,7 +71,13 @@ public class FileBufferReaderITCase extends TestLogger {
 
 	private static final int numRecords = 100_000;
 
-	private static final byte[] dataSource = new byte[1024];
+	private static final int bufferSize = 4096;
+
+	private static final int headerSize = 8;
+
+	private static final int recordSize = bufferSize - headerSize;
+
+	private static final byte[] dataSource = new byte[recordSize];
 
 	@BeforeClass
 	public static void setup() {
@@ -87,6 +93,7 @@ public class FileBufferReaderITCase extends TestLogger {
 		configuration.setString(RestOptions.BIND_PORT, "0");
 		configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BLOCKING_SHUFFLE_TYPE, "file");
 		configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("1g"));
+		configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse(bufferSize + "b"));
 
 		final MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
 			.setConfiguration(configuration)
@@ -153,12 +160,8 @@ public class FileBufferReaderITCase extends TestLogger {
 			final ByteArrayType bytes = new ByteArrayType(dataSource);
 			int counter = 0;
 			while (counter++ < numRecords) {
-				try {
-					writer.emit(bytes);
-					writer.flushAll();
-				} finally {
-					writer.clearBuffers();
-				}
+				writer.emit(bytes);
+				writer.flushAll();
 			}
 		}
 	}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index 479ae57..721ecda 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -105,7 +105,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
 				}
 			}
 			finally {
-				writer.clearBuffers();
+				writer.close();
 				writer.flushAll();
 			}
 		}
@@ -139,7 +139,7 @@ public class NetworkStackThroughputITCase extends TestLogger {
 			}
 			finally {
 				reader.clearBuffers();
-				writer.clearBuffers();
+				writer.close();
 				writer.flushAll();
 			}
 		}
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
index 3f7c85d..c483eba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/ShuffleCompressionITCase.java
@@ -167,7 +167,7 @@ public class ShuffleCompressionITCase {
 				writer.broadcastEmit(RECORD_TO_SEND);
 			}
 			writer.flushAll();
-			writer.clearBuffers();
+			writer.close();
 		}
 	}
 
diff --git a/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java
index df2c1ec..cfd03ca 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/scheduling/PipelinedRegionSchedulingITCase.java
@@ -163,7 +163,7 @@ public class PipelinedRegionSchedulingITCase extends TestLogger {
 				writer.emit(new IntValue(42));
 				writer.flushAll();
 			} finally {
-				writer.clearBuffers();
+				writer.close();
 			}
 
 			if (getIndexInSubtaskGroup() == 0) {


[flink] 04/07: [FLINK-19297][network] Make ResultPartitionWriter record-oriented

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 900a896922403f4c1538d4b785d17b7cf72abcd7
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Wed Aug 26 14:22:25 2020 +0800

    [FLINK-19297][network] Make ResultPartitionWriter record-oriented
    
    Currently, the ResultPartitionWriter is buffer-oriented, that is, RecordWriter can only add buffers of different channels to ResultPartitionWriter and the buffer boundary serves as a nature boundary of data belonging to different channels. However, this abstraction is not flexible enough to handle new implementations like sort-based partitioning where records are appended a joint structure shared by all channels and sorting is used to cluster data belonging to different channels. This [...]
---
 .../api/serialization/RecordSerializer.java        |  94 ------
 .../serialization/SpanningRecordSerializer.java    | 114 --------
 .../network/api/writer/BroadcastRecordWriter.java  | 129 +-------
 .../api/writer/ChannelSelectorRecordWriter.java    |  82 +-----
 .../io/network/api/writer/RecordWriter.java        | 171 +++--------
 .../network/api/writer/ResultPartitionWriter.java  |  55 ++--
 .../partition/BufferWritingResultPartition.java    | 179 ++++++++++--
 .../io/network/partition/ResultPartition.java      |  21 +-
 .../operators/shipping/OutputCollector.java        |   3 -
 ...bleNotifyingResultPartitionWriterDecorator.java |  64 ++--
 .../SpanningRecordSerializationTest.java           |  65 +++--
 .../SpanningRecordSerializerTest.java              | 196 -------------
 .../AbstractCollectingResultPartitionWriter.java   |  70 ++---
 .../api/writer/BroadcastRecordWriterTest.java      |  77 +++--
 .../RecordCollectingResultPartitionWriter.java     |   4 +-
 ...cordOrEventCollectingResultPartitionWriter.java |  35 +--
 .../api/writer/RecordWriterDelegateTest.java       |  53 ++--
 .../io/network/api/writer/RecordWriterTest.java    | 325 +++++----------------
 .../network/netty/PartitionRequestQueueTest.java   |  21 +-
 .../partition/MockResultPartitionWriter.java       |  34 ++-
 .../PartialConsumePipelinedResultTest.java         |   7 +-
 .../io/network/partition/PartitionTestUtils.java   |  12 -
 .../partition/PipelinedSubpartitionTest.java       |  26 +-
 .../io/network/partition/ResultPartitionTest.java  | 252 ++++++++++------
 .../IteratorWrappingTestSingleInputGate.java       |  13 +-
 .../partition/consumer/LocalInputChannelTest.java  |  26 +-
 .../partition/consumer/SingleInputGateTest.java    |   6 +-
 .../io/network/util/TestPartitionProducer.java     |  16 +-
 .../io/network/util/TestProducerSource.java        |  14 +-
 .../io/network/util/TestSubpartitionProducer.java  |  13 +-
 .../operators/testutils/MockEnvironment.java       |   2 +-
 .../consumer/StreamTestSingleInputGate.java        |  11 +-
 .../runtime/io/StreamTaskNetworkInputTest.java     |  12 +-
 .../runtime/tasks/StreamMockEnvironment.java       |   2 -
 .../tasks/SubtaskCheckpointCoordinatorTest.java    |   4 +-
 35 files changed, 785 insertions(+), 1423 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
deleted file mode 100644
index 6f73917..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.serialization;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-
-import java.io.IOException;
-
-/**
- * Interface for turning records into sequences of memory segments.
- */
-public interface RecordSerializer<T extends IOReadableWritable> {
-
-	/**
-	 * Status of the serialization result.
-	 */
-	enum SerializationResult {
-		PARTIAL_RECORD_MEMORY_SEGMENT_FULL(false, true),
-		FULL_RECORD_MEMORY_SEGMENT_FULL(true, true),
-		FULL_RECORD(true, false);
-
-		private final boolean isFullRecord;
-
-		private final boolean isFullBuffer;
-
-		SerializationResult(boolean isFullRecord, boolean isFullBuffer) {
-			this.isFullRecord = isFullRecord;
-			this.isFullBuffer = isFullBuffer;
-		}
-
-		/**
-		 * Whether the full record was serialized and completely written to
-		 * a target buffer.
-		 *
-		 * @return <tt>true</tt> if the complete record was written
-		 */
-		public boolean isFullRecord() {
-			return this.isFullRecord;
-		}
-
-		/**
-		 * Whether the target buffer is full after the serialization process.
-		 *
-		 * @return <tt>true</tt> if the target buffer is full
-		 */
-		public boolean isFullBuffer() {
-			return this.isFullBuffer;
-		}
-	}
-
-	/**
-	 * Starts serializing the given record to an intermediate data buffer.
-	 *
-	 * @param record the record to serialize
-	 */
-	void serializeRecord(T record) throws IOException;
-
-	/**
-	 * Copies the intermediate data serialization buffer to the given target buffer.
-	 *
-	 * @param bufferBuilder the new target buffer to use
-	 * @return how much information was written to the target buffer and
-	 *         whether this buffer is full
-	 */
-	SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder);
-
-	/**
-	 * Supports copying an intermediate data serialization buffer to multiple target buffers
-	 * by resetting its initial position before each copying.
-	 */
-	void reset();
-
-	/**
-	 * @return <tt>true</tt> if has some serialized data pending copying to the result {@link BufferBuilder}.
-	 */
-	boolean hasSerializedData();
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
deleted file mode 100644
index 1bb9ec8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.serialization;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-/**
- * Record serializer which serializes the complete record to an intermediate
- * data serialization buffer and copies this buffer to target buffers
- * one-by-one using {@link #copyToBufferBuilder(BufferBuilder)}.
- *
- * @param <T> The type of the records that are serialized.
- */
-public class SpanningRecordSerializer<T extends IOReadableWritable> implements RecordSerializer<T> {
-
-	/** Flag to enable/disable checks, if buffer not set/full or pending serialization. */
-	private static final boolean CHECKED = false;
-
-	/** Intermediate data serialization. */
-	private final DataOutputSerializer serializationBuffer;
-
-	/** Intermediate buffer for data serialization (wrapped from {@link #serializationBuffer}). */
-	private ByteBuffer dataBuffer;
-
-	public SpanningRecordSerializer() {
-		serializationBuffer = new DataOutputSerializer(128);
-
-		// ensure initial state with hasRemaining false (for correct continueWritingWithNextBufferBuilder logic)
-		dataBuffer = serializationBuffer.wrapAsByteBuffer();
-	}
-
-	/**
-	 * Serializes the complete record to an intermediate data serialization buffer.
-	 *
-	 * @param record the record to serialize
-	 */
-	@Override
-	public void serializeRecord(T record) throws IOException {
-		if (CHECKED) {
-			if (dataBuffer.hasRemaining()) {
-				throw new IllegalStateException("Pending serialization of previous record.");
-			}
-		}
-
-		serializationBuffer.clear();
-		// the initial capacity of the serialization buffer should be no less than 4
-		serializationBuffer.skipBytesToWrite(4);
-
-		// write data and length
-		record.write(serializationBuffer);
-
-		int len = serializationBuffer.length() - 4;
-		serializationBuffer.setPosition(0);
-		serializationBuffer.writeInt(len);
-		serializationBuffer.skipBytesToWrite(len);
-
-		dataBuffer = serializationBuffer.wrapAsByteBuffer();
-	}
-
-	/**
-	 * Copies an intermediate data serialization buffer into the target BufferBuilder.
-	 *
-	 * @param targetBuffer the target BufferBuilder to copy to
-	 * @return how much information was written to the target buffer and
-	 *         whether this buffer is full
-	 */
-	@Override
-	public SerializationResult copyToBufferBuilder(BufferBuilder targetBuffer) {
-		targetBuffer.append(dataBuffer);
-		targetBuffer.commit();
-
-		return getSerializationResult(targetBuffer);
-	}
-
-	private SerializationResult getSerializationResult(BufferBuilder targetBuffer) {
-		if (dataBuffer.hasRemaining()) {
-			return SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL;
-		}
-		return !targetBuffer.isFull()
-			? SerializationResult.FULL_RECORD
-			: SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL;
-	}
-
-	@Override
-	public void reset() {
-		dataBuffer.position(0);
-	}
-
-	@Override
-	public boolean hasSerializedData() {
-		return dataBuffer.hasRemaining();
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
index b834738..ec800dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
@@ -18,40 +18,20 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-
-import javax.annotation.Nullable;
 
 import java.io.IOException;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * A special record-oriented runtime result writer only for broadcast mode.
  *
- * <p>The BroadcastRecordWriter extends the {@link RecordWriter} and maintain a single {@link BufferBuilder}
- * for all the channels. Then the serialization results need be copied only once to this buffer which would be
- * shared for all the channels in a more efficient way.
+ * <p>The BroadcastRecordWriter extends the {@link RecordWriter} and emits records to all channels for
+ * regular {@link #emit(IOReadableWritable)}.
  *
  * @param <T> the type of the record that can be emitted with this record writer
  */
 public final class BroadcastRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
 
-	/** The current buffer builder shared for all the channels. */
-	@Nullable
-	private BufferBuilder bufferBuilder;
-
-	/**
-	 * The flag for judging whether {@link #requestNewBufferBuilder(int)} and {@link #flushTargetPartition(int)}
-	 * is triggered by {@link #randomEmit(IOReadableWritable)} or not.
-	 */
-	private boolean randomTriggered;
-
-	private BufferConsumer randomTriggeredConsumer;
-
 	BroadcastRecordWriter(
 			ResultPartitionWriter writer,
 			long timeout,
@@ -60,113 +40,18 @@ public final class BroadcastRecordWriter<T extends IOReadableWritable> extends R
 	}
 
 	@Override
-	public void emit(T record) throws IOException, InterruptedException {
+	public void emit(T record) throws IOException {
 		broadcastEmit(record);
 	}
 
 	@Override
-	public void randomEmit(T record) throws IOException, InterruptedException {
-		randomEmit(record, rng.nextInt(numberOfChannels));
-	}
-
-	/**
-	 * For non-broadcast emit, we try to finish the current {@link BufferBuilder} first, and then request
-	 * a new {@link BufferBuilder} for the random channel. If this new {@link BufferBuilder} is not full,
-	 * it can be shared for all the other channels via initializing readable position in created
-	 * {@link BufferConsumer}.
-	 */
-	@VisibleForTesting
-	void randomEmit(T record, int targetChannelIndex) throws IOException, InterruptedException {
-		tryFinishCurrentBufferBuilder(targetChannelIndex);
-
-		randomTriggered = true;
-		emit(record, targetChannelIndex);
-		randomTriggered = false;
-
-		if (bufferBuilder != null) {
-			for (int index = 0; index < numberOfChannels; index++) {
-				if (index != targetChannelIndex) {
-					addBufferConsumer(randomTriggeredConsumer.copyWithReaderPosition(bufferBuilder.getCommittedBytes()), index);
-				}
-			}
-		}
-	}
+	public void broadcastEmit(T record) throws IOException {
+		checkErroneous();
 
-	@Override
-	public void broadcastEmit(T record) throws IOException, InterruptedException {
-		// We could actually select any target channel here because all the channels
-		// are sharing the same BufferBuilder in broadcast mode.
-		emit(record, 0);
-	}
+		targetPartition.broadcastRecord(serializeRecord(serializer, record));
 
-	/**
-	 * The flush could be triggered by {@link #randomEmit(IOReadableWritable)}, {@link #emit(IOReadableWritable)}
-	 * or {@link #broadcastEmit(IOReadableWritable)}. Only random emit should flush a single target channel,
-	 * otherwise we should flush all the channels.
-	 */
-	@Override
-	public void flushTargetPartition(int targetChannel) {
-		if (randomTriggered) {
-			super.flushTargetPartition(targetChannel);
-		} else {
+		if (flushAlways) {
 			flushAll();
 		}
 	}
-
-	@Override
-	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		return bufferBuilder != null ? bufferBuilder : requestNewBufferBuilder(targetChannel);
-	}
-
-	/**
-	 * The request could be from broadcast or non-broadcast modes like {@link #randomEmit(IOReadableWritable)}.
-	 *
-	 * <p>For non-broadcast, the created {@link BufferConsumer} is only for the target channel.
-	 *
-	 * <p>For broadcast, all the channels share the same requested {@link BufferBuilder} and the created
-	 * {@link BufferConsumer} is copied for every channel.
-	 */
-	@Override
-	public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		checkState(bufferBuilder == null || bufferBuilder.isFinished());
-
-		BufferBuilder builder = super.requestNewBufferBuilder(targetChannel);
-		if (randomTriggered) {
-			addBufferConsumer(randomTriggeredConsumer = builder.createBufferConsumer(), targetChannel);
-		} else {
-			try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {
-				for (int channel = 0; channel < numberOfChannels; channel++) {
-					addBufferConsumer(bufferConsumer.copy(), channel);
-				}
-			}
-		}
-
-		bufferBuilder = builder;
-		return builder;
-	}
-
-	@Override
-	public void tryFinishCurrentBufferBuilder(int targetChannel) {
-		if (bufferBuilder == null) {
-			return;
-		}
-
-		BufferBuilder builder = bufferBuilder;
-		bufferBuilder = null;
-
-		finishBufferBuilder(builder);
-	}
-
-	@Override
-	public void emptyCurrentBufferBuilder(int targetChannel) {
-		bufferBuilder = null;
-	}
-
-	@Override
-	public void closeBufferBuilders() {
-		if (bufferBuilder != null) {
-			bufferBuilder.finish();
-			bufferBuilder = null;
-		}
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
index 5e32056..b94207e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
@@ -19,19 +19,17 @@
 package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A regular record-oriented runtime result writer.
  *
- * <p>The ChannelSelectorRecordWriter extends the {@link RecordWriter} and maintains an array of
- * {@link BufferBuilder}s for all the channels. The {@link #emit(IOReadableWritable)}
- * operation is based on {@link ChannelSelector} to select the target channel.
+ * <p>The ChannelSelectorRecordWriter extends the {@link RecordWriter} and emits records to the channel
+ * selected by the {@link ChannelSelector} for regular {@link #emit(IOReadableWritable)}.
  *
  * @param <T> the type of the record that can be emitted with this record writer
  */
@@ -39,9 +37,6 @@ public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> ext
 
 	private final ChannelSelector<T> channelSelector;
 
-	/** Every subpartition maintains a separate buffer builder which might be null. */
-	private final BufferBuilder[] bufferBuilders;
-
 	ChannelSelectorRecordWriter(
 			ResultPartitionWriter writer,
 			ChannelSelector<T> channelSelector,
@@ -51,77 +46,28 @@ public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> ext
 
 		this.channelSelector = checkNotNull(channelSelector);
 		this.channelSelector.setup(numberOfChannels);
-
-		this.bufferBuilders = new BufferBuilder[numberOfChannels];
 	}
 
 	@Override
-	public void emit(T record) throws IOException, InterruptedException {
+	public void emit(T record) throws IOException {
 		emit(record, channelSelector.selectChannel(record));
 	}
 
 	@Override
-	public void randomEmit(T record) throws IOException, InterruptedException {
-		emit(record, rng.nextInt(numberOfChannels));
-	}
-
-	/**
-	 * The record is serialized into intermediate serialization buffer which is then copied
-	 * into the target buffer for every channel.
-	 */
-	@Override
-	public void broadcastEmit(T record) throws IOException, InterruptedException {
+	public void broadcastEmit(T record) throws IOException {
 		checkErroneous();
 
-		serializer.serializeRecord(record);
-
-		for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) {
-			copyFromSerializerToTargetChannel(targetChannel);
-		}
-	}
-
-	@Override
-	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		if (bufferBuilders[targetChannel] != null) {
-			return bufferBuilders[targetChannel];
-		} else {
-			return requestNewBufferBuilder(targetChannel);
-		}
-	}
-
-	@Override
-	public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		checkState(bufferBuilders[targetChannel] == null || bufferBuilders[targetChannel].isFinished());
-
-		BufferBuilder bufferBuilder = super.requestNewBufferBuilder(targetChannel);
-		addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
-		bufferBuilders[targetChannel] = bufferBuilder;
-		return bufferBuilder;
-	}
-
-	@Override
-	public void tryFinishCurrentBufferBuilder(int targetChannel) {
-		if (bufferBuilders[targetChannel] == null) {
-			return;
+		// Emitting to all channels in a for loop can be better than calling
+		// ResultPartitionWriter#broadcastRecord because the broadcastRecord
+		// method incurs extra overhead.
+		ByteBuffer serializedRecord = serializeRecord(serializer, record);
+		for (int channelIndex = 0; channelIndex < numberOfChannels; channelIndex++) {
+			serializedRecord.rewind();
+			emit(record, channelIndex);
 		}
-		BufferBuilder bufferBuilder = bufferBuilders[targetChannel];
-		bufferBuilders[targetChannel] = null;
 
-		finishBufferBuilder(bufferBuilder);
-	}
-
-	@Override
-	public void emptyCurrentBufferBuilder(int targetChannel) {
-		bufferBuilders[targetChannel] = null;
-	}
-
-	@Override
-	public void closeBufferBuilders() {
-		for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) {
-			if (bufferBuilders[targetChannel] != null) {
-				bufferBuilders[targetChannel].finish();
-				bufferBuilders[targetChannel] = null;
-			}
+		if (flushAlways) {
+			flushAll();
 		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 995e670..b58ef38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -20,17 +20,9 @@ package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.AvailabilityProvider;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.util.XORShiftRandom;
 
@@ -40,18 +32,17 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * An abstract record-oriented runtime result writer.
  *
  * <p>The RecordWriter wraps the runtime's {@link ResultPartitionWriter} and takes care of
- * serializing records into buffers.
+ * channel selection and serializing records into bytes.
  *
  * @param <T> the type of the record that can be emitted with this record writer
  */
@@ -63,21 +54,15 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 
 	private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
 
-	private final ResultPartitionWriter targetPartition;
+	protected final ResultPartitionWriter targetPartition;
 
 	protected final int numberOfChannels;
 
-	protected final RecordSerializer<T> serializer;
+	protected final DataOutputSerializer serializer;
 
 	protected final Random rng = new XORShiftRandom();
 
-	private Counter numBytesOut = new SimpleCounter();
-
-	private Counter numBuffersOut = new SimpleCounter();
-
-	protected Meter idleTimeMsPerSecond = new MeterView(new SimpleCounter());
-
-	private final boolean flushAlways;
+	protected final boolean flushAlways;
 
 	/** The thread that periodically flushes the output, to give an upper latency bound. */
 	@Nullable
@@ -93,7 +78,7 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 		this.targetPartition = writer;
 		this.numberOfChannels = writer.getNumberOfSubpartitions();
 
-		this.serializer = new SpanningRecordSerializer<T>();
+		this.serializer = new DataOutputSerializer(128);
 
 		checkArgument(timeout >= -1);
 		this.flushAlways = (timeout == 0);
@@ -109,89 +94,58 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 		}
 	}
 
-	protected void emit(T record, int targetChannel) throws IOException, InterruptedException {
+	protected void emit(T record, int targetSubpartition) throws IOException {
 		checkErroneous();
 
-		serializer.serializeRecord(record);
+		targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);
 
-		// Make sure we don't hold onto the large intermediate serialization buffer for too long
-		copyFromSerializerToTargetChannel(targetChannel);
+		if (flushAlways) {
+			targetPartition.flush(targetSubpartition);
+		}
 	}
 
-	/**
-	 * @param targetChannel
-	 * @return <tt>true</tt> if the intermediate serialization buffer should be pruned
-	 */
-	protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
-		// We should reset the initial position of the intermediate serialization buffer before
-		// copying, so the serialization results can be copied to multiple target buffers.
-		serializer.reset();
-
-		boolean pruneTriggered = false;
-		BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
-		SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
-		while (result.isFullBuffer()) {
-			finishBufferBuilder(bufferBuilder);
-
-			// If this was a full record, we are done. Not breaking out of the loop at this point
-			// will lead to another buffer request before breaking out (that would not be a
-			// problem per se, but it can lead to stalls in the pipeline).
-			if (result.isFullRecord()) {
-				pruneTriggered = true;
-				emptyCurrentBufferBuilder(targetChannel);
-				break;
-			}
+	public void broadcastEvent(AbstractEvent event) throws IOException {
+		broadcastEvent(event, false);
+	}
 
-			bufferBuilder = requestNewBufferBuilder(targetChannel);
-			result = serializer.copyToBufferBuilder(bufferBuilder);
-		}
-		checkState(!serializer.hasSerializedData(), "All data should be written at once");
+	public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
+		targetPartition.broadcastEvent(event, isPriorityEvent);
 
 		if (flushAlways) {
-			flushTargetPartition(targetChannel);
+			flushAll();
 		}
-		return pruneTriggered;
 	}
 
-	public void broadcastEvent(AbstractEvent event) throws IOException {
-		broadcastEvent(event, false);
-	}
+	@VisibleForTesting
+	public static ByteBuffer serializeRecord(
+			DataOutputSerializer serializer,
+			IOReadableWritable record) throws IOException {
+		serializer.clear();
 
-	public void broadcastEvent(AbstractEvent event, boolean hasPriority) throws IOException {
-		try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event, hasPriority)) {
-			for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) {
-				tryFinishCurrentBufferBuilder(targetChannel);
+		// the initial capacity should be no less than 4 bytes
+		serializer.skipBytesToWrite(4);
 
-				// Retain the buffer so that it can be recycled by each channel of targetPartition
-				targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
-			}
+		// write data
+		record.write(serializer);
 
-			if (flushAlways) {
-				flushAll();
-			}
-		}
+		// write length
+		int len = serializer.length() - 4;
+		serializer.setPosition(0);
+		serializer.writeInt(len);
+		serializer.skipBytesToWrite(len);
+
+		return serializer.wrapAsByteBuffer();
 	}
 
 	public void flushAll() {
 		targetPartition.flushAll();
 	}
 
-	protected void flushTargetPartition(int targetChannel) {
-		targetPartition.flush(targetChannel);
-	}
-
 	/**
 	 * Sets the metric group for this RecordWriter.
      */
 	public void setMetricGroup(TaskIOMetricGroup metrics) {
-		numBytesOut = metrics.getNumBytesOutCounter();
-		numBuffersOut = metrics.getNumBuffersOutCounter();
-		idleTimeMsPerSecond = metrics.getIdleTimeMsPerSecond();
-	}
-
-	protected void finishBufferBuilder(BufferBuilder bufferBuilder) {
-		numBytesOut.inc(bufferBuilder.finish());
-		numBuffersOut.inc();
+		targetPartition.setMetricGroup(metrics);
 	}
 
 	@Override
@@ -202,44 +156,27 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 	/**
 	 * This is used to send regular records.
 	 */
-	public abstract void emit(T record) throws IOException, InterruptedException;
+	public abstract void emit(T record) throws IOException;
 
 	/**
 	 * This is used to send LatencyMarks to a random target channel.
 	 */
-	public abstract void randomEmit(T record) throws IOException, InterruptedException;
-
-	/**
-	 * This is used to broadcast streaming Watermarks in-band with records.
-	 */
-	public abstract void broadcastEmit(T record) throws IOException, InterruptedException;
-
-	/**
-	 * The {@link BufferBuilder} may already exist if not filled up last time, otherwise we need
-	 * request a new one for this target channel.
-	 */
-	abstract BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException;
-
-	/**
-	 * Marks the current {@link BufferBuilder} as finished if present and clears the state for next one.
-	 */
-	abstract void tryFinishCurrentBufferBuilder(int targetChannel);
+	public void randomEmit(T record) throws IOException {
+		checkErroneous();
 
-	/**
-	 * Marks the current {@link BufferBuilder} as empty for the target channel.
-	 */
-	abstract void emptyCurrentBufferBuilder(int targetChannel);
+		int targetSubpartition = rng.nextInt(numberOfChannels);
+		emit(record, targetSubpartition);
+	}
 
 	/**
-	 * Marks the current {@link BufferBuilder}s as finished and releases the resources.
+	 * This is used to broadcast streaming Watermarks in-band with records.
 	 */
-	abstract void closeBufferBuilders();
+	public abstract void broadcastEmit(T record) throws IOException;
 
 	/**
 	 * Closes the writer. This stops the flushing thread (if there is one).
 	 */
 	public void close() {
-		closeBufferBuilders();
 		// make sure we terminate the thread in any case
 		if (outputFlusher != null) {
 			outputFlusher.terminate();
@@ -277,28 +214,6 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 		}
 	}
 
-	protected void addBufferConsumer(BufferConsumer consumer, int targetChannel) throws IOException {
-		targetPartition.addBufferConsumer(consumer, targetChannel);
-	}
-
-	/**
-	 * Requests a new {@link BufferBuilder} for the target channel and returns it.
-	 */
-	public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		BufferBuilder builder = targetPartition.tryGetBufferBuilder(targetChannel);
-		if (builder == null) {
-			long start = System.currentTimeMillis();
-			builder = targetPartition.getBufferBuilder(targetChannel);
-			idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
-		}
-		return builder;
-	}
-
-	@VisibleForTesting
-	public Meter getIdleTimeMsPerSecond() {
-		return idleTimeMsPerSecond;
-	}
-
 	// ------------------------------------------------------------------------
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 838850e..a90b0ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,19 +18,20 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.AvailabilityProvider;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 /**
- * A buffer-oriented runtime result writer API for producing results.
+ * A record-oriented runtime result writer API for producing results.
  *
  * <p>If {@link ResultPartitionWriter#close()} is called before {@link ResultPartitionWriter#fail(Throwable)} or
  * {@link ResultPartitionWriter#finish()}, it abruptly triggers failure and cancellation of production.
@@ -51,30 +52,27 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
 	int getNumTargetKeyGroups();
 
 	/**
-	 * Requests a {@link BufferBuilder} from this partition for writing data.
+	 * Writes the given serialized record to the target subpartition.
 	 */
-	BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException;
+	void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException;
 
+	/**
+	 * Writes the given serialized record to all subpartitions. One can also achieve the same effect by emitting
+	 * the same record to all subpartitions one by one, however, this method can have better performance for the
+	 * underlying implementation can do some optimizations, for example coping the given serialized record only
+	 * once to a shared channel which can be consumed by all subpartitions.
+	 */
+	void broadcastRecord(ByteBuffer record) throws IOException;
 
 	/**
-	 * Try to request a {@link BufferBuilder} from this partition for writing data.
-	 *
-	 * <p>Returns <code>null</code> if no buffer is available or the buffer provider has been destroyed.
+	 * Writes the given {@link AbstractEvent} to all channels.
 	 */
-	BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException;
+	void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException;
 
 	/**
-	 * Adds the bufferConsumer to the subpartition with the given index.
-	 *
-	 * <p>This method takes the ownership of the passed {@code bufferConsumer} and thus is responsible for releasing
-	 * it's resources.
-	 *
-	 * <p>To avoid problems with data re-ordering, before adding new {@link BufferConsumer} the previously added one
-	 * the given {@code subpartitionIndex} must be marked as {@link BufferConsumer#isFinished()}.
-	 *
-	 * @return true if operation succeeded and bufferConsumer was enqueued for consumption.
+	 * Sets the metric group for the {@link ResultPartitionWriter}.
 	 */
-	boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException;
+	void setMetricGroup(TaskIOMetricGroup metrics);
 
 	/**
 	 * Returns a reader for the subpartition with the given index.
@@ -82,12 +80,12 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
 	ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException;
 
 	/**
-	 * Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers} in all subpartitions.
+	 * Manually trigger the consumption of data from all subpartitions.
 	 */
 	void flushAll();
 
 	/**
-	 * Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers} in one specified subpartition.
+	 * Manually trigger the consumption of data from the given subpartitions.
 	 */
 	void flush(int subpartitionIndex);
 
@@ -108,4 +106,19 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
 	 * <p>Closing of partition is still needed afterwards.
 	 */
 	void finish() throws IOException;
+
+	boolean isFinished();
+
+	/**
+	 * Releases the partition writer which releases the produced data and no reader can consume the
+	 * partition any more.
+	 */
+	void release(Throwable cause);
+
+	boolean isReleased();
+
+	/**
+	 * Closes the partition writer which releases the allocated resource, for example the buffer pool.
+	 */
+	void close() throws Exception;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index 437d0a0..bd9e8ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -19,16 +19,23 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkElementIndex;
@@ -49,6 +56,14 @@ public class BufferWritingResultPartition extends ResultPartition {
 	/** The subpartitions of this partition. At least one. */
 	protected final ResultSubpartition[] subpartitions;
 
+	/** For non-broadcast mode, each subpartition maintains a separate BufferBuilder which might be null. */
+	private final BufferBuilder[] subpartitionBufferBuilders;
+
+	/** For broadcast mode, a single BufferBuilder is shared by all subpartitions. */
+	private BufferBuilder broadcastBufferBuilder;
+
+	private Meter idleTimeMsPerSecond = new MeterView(new SimpleCounter());
+
 	public BufferWritingResultPartition(
 		String owningTaskName,
 		int partitionIndex,
@@ -72,6 +87,7 @@ public class BufferWritingResultPartition extends ResultPartition {
 			bufferPoolFactory);
 
 		this.subpartitions = checkNotNull(subpartitions);
+		this.subpartitionBufferBuilders = new BufferBuilder[subpartitions.length];
 	}
 
 	public int getNumberOfQueuedBuffers() {
@@ -90,46 +106,58 @@ public class BufferWritingResultPartition extends ResultPartition {
 	}
 
 	@Override
-	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		checkInProduceState();
-
-		return bufferPool.requestBufferBuilderBlocking(targetChannel);
+	public void flushAll() {
+		for (ResultSubpartition subpartition : subpartitions) {
+			subpartition.flush();
+		}
 	}
 
 	@Override
-	public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-		return bufferPool.requestBufferBuilder(targetChannel);
+	public void flush(int targetSubpartition) {
+		subpartitions[targetSubpartition].flush();
 	}
 
-	@Override
-	public boolean addBufferConsumer(
-			BufferConsumer bufferConsumer,
-			int subpartitionIndex) throws IOException {
-		checkNotNull(bufferConsumer);
+	public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
+		do {
+			final BufferBuilder bufferBuilder = getSubpartitionBufferBuilder(targetSubpartition);
+			bufferBuilder.appendAndCommit(record);
 
-		ResultSubpartition subpartition;
-		try {
-			checkInProduceState();
-			subpartition = subpartitions[subpartitionIndex];
-		}
-		catch (Exception ex) {
-			bufferConsumer.close();
-			throw ex;
-		}
+			if (bufferBuilder.isFull()) {
+				finishSubpartitionBufferBuilder(targetSubpartition);
+			}
+		} while (record.hasRemaining());
+	}
+
+	@Override
+	public void broadcastRecord(ByteBuffer record) throws IOException {
+		do {
+			final BufferBuilder bufferBuilder = getBroadcastBufferBuilder();
+			bufferBuilder.appendAndCommit(record);
 
-		return subpartition.add(bufferConsumer);
+			if (bufferBuilder.isFull()) {
+				finishBroadcastBufferBuilder();
+			}
+		} while (record.hasRemaining());
 	}
 
 	@Override
-	public void flushAll() {
-		for (ResultSubpartition subpartition : subpartitions) {
-			subpartition.flush();
+	public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
+		checkInProduceState();
+		finishBroadcastBufferBuilder();
+		finishSubpartitionBufferBuilders();
+
+		try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event, isPriorityEvent)) {
+			for (ResultSubpartition subpartition : subpartitions) {
+				// Retain the buffer so that it can be recycled by each channel of targetPartition
+				subpartition.add(eventBufferConsumer.copy());
+			}
 		}
 	}
 
 	@Override
-	public void flush(int targetSubpartition) {
-		subpartitions[targetSubpartition].flush();
+	public void setMetricGroup(TaskIOMetricGroup metrics) {
+		super.setMetricGroup(metrics);
+		idleTimeMsPerSecond = metrics.getIdleTimeMsPerSecond();
 	}
 
 	@Override
@@ -149,9 +177,13 @@ public class BufferWritingResultPartition extends ResultPartition {
 
 	@Override
 	public void finish() throws IOException {
+		finishBroadcastBufferBuilder();
+		finishSubpartitionBufferBuilders();
+
 		for (ResultSubpartition subpartition : subpartitions) {
 			subpartition.finish();
 		}
+
 		super.finish();
 	}
 
@@ -169,6 +201,101 @@ public class BufferWritingResultPartition extends ResultPartition {
 		}
 	}
 
+	private BufferBuilder getSubpartitionBufferBuilder(int targetSubpartition) throws IOException {
+		final BufferBuilder bufferBuilder = subpartitionBufferBuilders[targetSubpartition];
+		if (bufferBuilder != null) {
+			return bufferBuilder;
+		}
+
+		return getNewSubpartitionBufferBuilder(targetSubpartition);
+	}
+
+	private BufferBuilder getNewSubpartitionBufferBuilder(int targetSubpartition) throws IOException {
+		checkInProduceState();
+		ensureUnicastMode();
+
+		final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(targetSubpartition);
+		subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumer());
+		subpartitionBufferBuilders[targetSubpartition] = bufferBuilder;
+		return bufferBuilder;
+	}
+
+	private BufferBuilder getBroadcastBufferBuilder() throws IOException {
+		if (broadcastBufferBuilder != null) {
+			return broadcastBufferBuilder;
+		}
+
+		return getNewBroadcastBufferBuilder();
+	}
+
+	private BufferBuilder getNewBroadcastBufferBuilder() throws IOException {
+		checkInProduceState();
+		ensureBroadcastMode();
+
+		final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(0);
+		broadcastBufferBuilder = bufferBuilder;
+
+		try (final BufferConsumer consumer = bufferBuilder.createBufferConsumer()) {
+			for (ResultSubpartition subpartition : subpartitions) {
+				subpartition.add(consumer.copy());
+			}
+		}
+
+		return bufferBuilder;
+	}
+
+	private BufferBuilder requestNewBufferBuilderFromPool(int targetSubpartition) throws IOException {
+		BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetSubpartition);
+		if (bufferBuilder != null) {
+			return bufferBuilder;
+		}
+
+		final long start = System.currentTimeMillis();
+		try {
+			bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition);
+			idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
+			return bufferBuilder;
+		} catch (InterruptedException e) {
+			throw new IOException("Interrupted while waiting for buffer");
+		}
+	}
+
+	private void finishSubpartitionBufferBuilder(int targetSubpartition) {
+		final BufferBuilder bufferBuilder = subpartitionBufferBuilders[targetSubpartition];
+		if (bufferBuilder != null) {
+			numBytesOut.inc(bufferBuilder.finish());
+			numBuffersOut.inc();
+			subpartitionBufferBuilders[targetSubpartition] = null;
+		}
+	}
+
+	private void finishSubpartitionBufferBuilders() {
+		for (int channelIndex = 0; channelIndex < numSubpartitions; channelIndex++) {
+			finishSubpartitionBufferBuilder(channelIndex);
+		}
+	}
+
+	private void finishBroadcastBufferBuilder() {
+		if (broadcastBufferBuilder != null) {
+			numBytesOut.inc(broadcastBufferBuilder.finish() * numSubpartitions);
+			numBuffersOut.inc(numSubpartitions);
+			broadcastBufferBuilder = null;
+		}
+	}
+
+	private void ensureUnicastMode() {
+		finishBroadcastBufferBuilder();
+	}
+
+	private void ensureBroadcastMode() {
+		finishSubpartitionBufferBuilders();
+	}
+
+	@VisibleForTesting
+	public Meter getIdleTimeMsPerSecond() {
+		return idleTimeMsPerSecond;
+	}
+
 	@VisibleForTesting
 	public ResultSubpartition[] getAllPartitions() {
 		return subpartitions;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 02c91c5..bd81c19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -27,6 +29,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.FunctionWithException;
@@ -103,6 +106,10 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 	@Nullable
 	protected final BufferCompressor bufferCompressor;
 
+	protected Counter numBytesOut = new SimpleCounter();
+
+	protected Counter numBuffersOut = new SimpleCounter();
+
 	public ResultPartition(
 		String owningTaskName,
 		int partitionIndex,
@@ -202,13 +209,15 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 		isFinished = true;
 	}
 
+	@Override
+	public boolean isFinished() {
+		return isFinished;
+	}
+
 	public void release() {
 		release(null);
 	}
 
-	/**
-	 * Releases the result partition.
-	 */
 	public void release(Throwable cause) {
 		if (isReleased.compareAndSet(false, true)) {
 			LOG.debug("{}: Releasing {}.", owningTaskName, this);
@@ -248,6 +257,12 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 		return numTargetKeyGroups;
 	}
 
+	@Override
+	public void setMetricGroup(TaskIOMetricGroup metrics) {
+		numBytesOut = metrics.getNumBytesOutCounter();
+		numBuffersOut = metrics.getNumBuffersOutCounter();
+	}
+
 	/**
 	 * Releases buffers held by this result partition.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
index d8cd9ec..a2dd4c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
@@ -68,9 +68,6 @@ public class OutputCollector<T> implements Collector<T> {
 			catch (IOException e) {
 				throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
 			}
-			catch (InterruptedException e) {
-				throw new RuntimeException("Emitting the record was interrupted: " + e.getMessage(), e);
-			}
 		}
 		else {
 			throw new NullPointerException("The system does not support records that are null. "
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
index 3451939..3eca089 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
@@ -21,17 +21,18 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition;
 import org.apache.flink.runtime.io.network.partition.CheckpointedResultSubpartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
@@ -69,16 +70,6 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
 	}
 
 	@Override
-	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		return partitionWriter.getBufferBuilder(targetChannel);
-	}
-
-	@Override
-	public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-		return partitionWriter.tryGetBufferBuilder(targetChannel);
-	}
-
-	@Override
 	public ResultPartitionID getPartitionId() {
 		return partitionWriter.getPartitionId();
 	}
@@ -99,18 +90,34 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
 	}
 
 	@Override
-	public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
-		return partitionWriter.createSubpartitionView(index, availabilityListener);
+	public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
+		partitionWriter.emitRecord(record, targetSubpartition);
+
+		notifyPipelinedConsumers();
 	}
 
 	@Override
-	public boolean addBufferConsumer(BufferConsumer bufferConsumer,	int subpartitionIndex) throws IOException {
-		boolean success = partitionWriter.addBufferConsumer(bufferConsumer, subpartitionIndex);
-		if (success) {
-			notifyPipelinedConsumers();
-		}
+	public void broadcastRecord(ByteBuffer record) throws IOException {
+		partitionWriter.broadcastRecord(record);
+
+		notifyPipelinedConsumers();
+	}
+
+	@Override
+	public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
+		partitionWriter.broadcastEvent(event, isPriorityEvent);
+
+		notifyPipelinedConsumers();
+	}
 
-		return success;
+	@Override
+	public void setMetricGroup(TaskIOMetricGroup metrics) {
+		partitionWriter.setMetricGroup(metrics);
+	}
+
+	@Override
+	public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
+		return partitionWriter.createSubpartitionView(index, availabilityListener);
 	}
 
 	@Override
@@ -131,6 +138,21 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
 	}
 
 	@Override
+	public boolean isFinished() {
+		return partitionWriter.isFinished();
+	}
+
+	@Override
+	public void release(Throwable cause) {
+		partitionWriter.release(cause);
+	}
+
+	@Override
+	public boolean isReleased() {
+		return partitionWriter.isReleased();
+	}
+
+	@Override
 	public void fail(Throwable throwable) {
 		partitionWriter.fail(throwable);
 	}
@@ -152,7 +174,7 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
 	 * this will trigger the deployment of consuming tasks after the first buffer has been added.
 	 */
 	private void notifyPipelinedConsumers() {
-		if (!hasNotifiedPipelinedConsumers) {
+		if (!hasNotifiedPipelinedConsumers && !partitionWriter.isReleased()) {
 			partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionWriter.getPartitionId(), taskActions);
 
 			hasNotifiedPipelinedConsumers = true;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index e35b311..9c748db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -116,17 +118,16 @@ public class SpanningRecordSerializationTest extends TestLogger {
 	// -----------------------------------------------------------------------------------------------------------------
 
 	private void testSerializationRoundTrip(Iterable<SerializationTestType> records, int segmentSize) throws Exception {
-		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 		RecordDeserializer<SerializationTestType> deserializer =
 			new SpillingAdaptiveSpanningRecordDeserializer<>(
 				new String[]{ tempFolder.getRoot().getAbsolutePath() });
 
-		testSerializationRoundTrip(records, segmentSize, serializer, deserializer);
+		testSerializationRoundTrip(records, segmentSize, deserializer);
 	}
 
 	/**
-	 * Iterates over the provided records and tests whether {@link SpanningRecordSerializer} and {@link RecordDeserializer}
-	 * interact as expected.
+	 * Iterates over the provided records and tests whether {@link RecordWriter#serializeRecord} and
+	 * {@link RecordDeserializer} interact as expected.
 	 *
 	 * <p>Only a single {@link MemorySegment} will be allocated.
 	 *
@@ -136,14 +137,14 @@ public class SpanningRecordSerializationTest extends TestLogger {
 	private static void testSerializationRoundTrip(
 			Iterable<SerializationTestType> records,
 			int segmentSize,
-			RecordSerializer<SerializationTestType> serializer,
 			RecordDeserializer<SerializationTestType> deserializer)
 		throws Exception {
+		final DataOutputSerializer serializer = new DataOutputSerializer(128);
 		final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<>();
 
 		// -------------------------------------------------------------------------------------------------------------
 
-		BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializer, segmentSize);
+		BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializer.wrapAsByteBuffer(), segmentSize);
 
 		int numRecords = 0;
 		for (SerializationTestType record : records) {
@@ -153,18 +154,21 @@ public class SpanningRecordSerializationTest extends TestLogger {
 			numRecords++;
 
 			// serialize record
-			serializer.serializeRecord(record);
-			if (serializer.copyToBufferBuilder(serializationResult.getBufferBuilder()).isFullBuffer()) {
+			serializer.clear();
+			ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, record);
+			serializationResult.getBufferBuilder().appendAndCommit(serializedRecord);
+			if (serializationResult.getBufferBuilder().isFull()) {
 				// buffer is full => start deserializing
 				deserializer.setNextBuffer(serializationResult.buildBuffer());
 
 				numRecords -= DeserializationUtils.deserializeRecords(serializedRecords, deserializer);
 
 				// move buffers as long as necessary (for long records)
-				while ((serializationResult = setNextBufferForSerializer(serializer, segmentSize)).isFullBuffer()) {
+				while ((serializationResult = setNextBufferForSerializer(serializedRecord, segmentSize)).isFullBuffer()) {
 					deserializer.setNextBuffer(serializationResult.buildBuffer());
 				}
 			}
+			Assert.assertFalse(serializedRecord.hasRemaining());
 		}
 
 		// deserialize left over records
@@ -183,18 +187,16 @@ public class SpanningRecordSerializationTest extends TestLogger {
 
 		// assert that all records have been serialized and deserialized
 		Assert.assertEquals(0, numRecords);
-		Assert.assertFalse(serializer.hasSerializedData());
 		Assert.assertFalse(deserializer.hasUnfinishedData());
 	}
 
 	@Test
 	public void testSmallRecordUnconsumedBuffer() throws Exception {
-		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 		RecordDeserializer<SerializationTestType> deserializer =
 			new SpillingAdaptiveSpanningRecordDeserializer<>(
 				new String[]{tempFolder.getRoot().getAbsolutePath()});
 
-		testUnconsumedBuffer(serializer, deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1024);
+		testUnconsumedBuffer(deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1024);
 	}
 
 	/**
@@ -202,33 +204,29 @@ public class SpanningRecordSerializationTest extends TestLogger {
 	 */
 	@Test
 	public void testSpanningRecordUnconsumedBuffer() throws Exception {
-		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 		RecordDeserializer<SerializationTestType> deserializer =
 			new SpillingAdaptiveSpanningRecordDeserializer<>(
 				new String[]{tempFolder.getRoot().getAbsolutePath()});
 
-		testUnconsumedBuffer(serializer, deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1);
+		testUnconsumedBuffer(deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1);
 	}
 
 	@Test
 	public void testLargeSpanningRecordUnconsumedBuffer() throws Exception {
-		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 		RecordDeserializer<SerializationTestType> deserializer =
 			new SpillingAdaptiveSpanningRecordDeserializer<>(
 				new String[]{tempFolder.getRoot().getAbsolutePath()});
 
-		testUnconsumedBuffer(serializer, deserializer, Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1);
+		testUnconsumedBuffer(deserializer, Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1);
 	}
 
 	@Test
 	public void testLargeSpanningRecordUnconsumedBufferWithLeftOverBytes() throws Exception {
-		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 		RecordDeserializer<SerializationTestType> deserializer =
 			new SpillingAdaptiveSpanningRecordDeserializer<>(
 				new String[]{tempFolder.getRoot().getAbsolutePath()});
 
 		testUnconsumedBuffer(
-			serializer,
 			deserializer,
 			Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY),
 			1,
@@ -237,7 +235,6 @@ public class SpanningRecordSerializationTest extends TestLogger {
 		deserializer.clear();
 
 		testUnconsumedBuffer(
-			serializer,
 			deserializer,
 			Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY),
 			1,
@@ -245,17 +242,18 @@ public class SpanningRecordSerializationTest extends TestLogger {
 	}
 
 	public void testUnconsumedBuffer(
-			RecordSerializer<SerializationTestType> serializer,
 			RecordDeserializer<SerializationTestType> deserializer,
 			SerializationTestType record,
 			int segmentSize,
 			byte... leftOverBytes) throws Exception {
 		try (ByteArrayOutputStream unconsumedBytes = new ByteArrayOutputStream()) {
-			serializer.serializeRecord(record);
+			DataOutputSerializer serializer = new DataOutputSerializer(128);
+			ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, record);
 
-			BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializer, segmentSize);
+			BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializedRecord, segmentSize);
 
-			if (serializer.copyToBufferBuilder(serializationResult.getBufferBuilder()).isFullBuffer()) {
+			serializationResult.getBufferBuilder().appendAndCommit(serializedRecord);
+			if (serializationResult.getBufferBuilder().isFull()) {
 				// buffer is full => start deserializing
 				Buffer buffer = serializationResult.buildBuffer();
 				writeBuffer(buffer.readOnlySlice().getNioBufferReadable(), unconsumedBytes);
@@ -265,7 +263,7 @@ public class SpanningRecordSerializationTest extends TestLogger {
 				deserializer.getNextRecord(record.getClass().newInstance());
 
 				// move buffers as long as necessary (for long records)
-				while ((serializationResult = setNextBufferForSerializer(serializer, segmentSize)).isFullBuffer()) {
+				while ((serializationResult = setNextBufferForSerializer(serializedRecord, segmentSize)).isFullBuffer()) {
 					buffer = serializationResult.buildBuffer();
 
 					if (serializationResult.isFullRecord()) {
@@ -310,7 +308,7 @@ public class SpanningRecordSerializationTest extends TestLogger {
 	}
 
 	private static BufferAndSerializerResult setNextBufferForSerializer(
-			RecordSerializer<SerializationTestType> serializer,
+			ByteBuffer serializedRecord,
 			int segmentSize) throws IOException {
 		// create a bufferBuilder with some random starting offset to properly test handling buffer slices in the
 		// deserialization code.
@@ -319,24 +317,29 @@ public class SpanningRecordSerializationTest extends TestLogger {
 		BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
 		bufferConsumer.build().recycleBuffer();
 
+		bufferBuilder.appendAndCommit(serializedRecord);
 		return new BufferAndSerializerResult(
 			bufferBuilder,
 			bufferConsumer,
-			serializer.copyToBufferBuilder(bufferBuilder));
+			bufferBuilder.isFull(),
+			!serializedRecord.hasRemaining());
 	}
 
 	private static class BufferAndSerializerResult {
 		private final BufferBuilder bufferBuilder;
 		private final BufferConsumer bufferConsumer;
-		private final RecordSerializer.SerializationResult serializationResult;
+		private final boolean isFullBuffer;
+		private final boolean isFullRecord;
 
 		public BufferAndSerializerResult(
 				BufferBuilder bufferBuilder,
 				BufferConsumer bufferConsumer,
-				RecordSerializer.SerializationResult serializationResult) {
+				boolean isFullBuffer,
+				boolean isFullRecord) {
 			this.bufferBuilder = bufferBuilder;
 			this.bufferConsumer = bufferConsumer;
-			this.serializationResult = serializationResult;
+			this.isFullBuffer = isFullBuffer;
+			this.isFullRecord = isFullRecord;
 		}
 
 		public BufferBuilder getBufferBuilder() {
@@ -348,11 +351,11 @@ public class SpanningRecordSerializationTest extends TestLogger {
 		}
 
 		public boolean isFullBuffer() {
-			return serializationResult.isFullBuffer();
+			return isFullBuffer;
 		}
 
 		public boolean isFullRecord() {
-			return serializationResult.isFullRecord();
+			return isFullRecord;
 		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
deleted file mode 100644
index e5f5dfc..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.serialization;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.testutils.serialization.types.SerializationTestType;
-import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
-import org.apache.flink.testutils.serialization.types.Util;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Random;
-
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-
-/**
- * Tests for the {@link SpanningRecordSerializer}.
- */
-public class SpanningRecordSerializerTest {
-
-	@Test
-	public void testHasSerializedData() throws IOException {
-		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
-		final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT);
-
-		Assert.assertFalse(serializer.hasSerializedData());
-
-		serializer.serializeRecord(randomIntRecord);
-		Assert.assertTrue(serializer.hasSerializedData());
-
-		final BufferBuilder bufferBuilder1 = createBufferBuilder(16);
-		serializer.copyToBufferBuilder(bufferBuilder1);
-		Assert.assertFalse(serializer.hasSerializedData());
-
-		final BufferBuilder bufferBuilder2 = createBufferBuilder(8);
-		serializer.reset();
-		serializer.copyToBufferBuilder(bufferBuilder2);
-		Assert.assertFalse(serializer.hasSerializedData());
-
-		serializer.reset();
-		serializer.copyToBufferBuilder(bufferBuilder2);
-		// Buffer builder full!
-		Assert.assertTrue(serializer.hasSerializedData());
-	}
-
-	@Test
-	public void testEmptyRecords() throws IOException {
-		final int segmentSize = 11;
-
-		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
-		final BufferBuilder bufferBuilder1 = createBufferBuilder(segmentSize);
-
-		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD,
-			serializer.copyToBufferBuilder(bufferBuilder1));
-
-		SerializationTestType emptyRecord = new SerializationTestType() {
-			@Override
-			public SerializationTestType getRandom(Random rnd) {
-				throw new UnsupportedOperationException();
-			}
-
-			@Override
-			public int length() {
-				throw new UnsupportedOperationException();
-			}
-
-			@Override
-			public void write(DataOutputView out) {}
-
-			@Override
-			public void read(DataInputView in) {}
-
-			@Override
-			public int hashCode() {
-				throw new UnsupportedOperationException();
-			}
-
-			@Override
-			public boolean equals(Object obj) {
-				throw new UnsupportedOperationException();
-			}
-		};
-
-		serializer.serializeRecord(emptyRecord);
-		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.copyToBufferBuilder(bufferBuilder1));
-
-		serializer.reset();
-		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.copyToBufferBuilder(bufferBuilder1));
-
-		serializer.reset();
-		Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
-			serializer.copyToBufferBuilder(bufferBuilder1));
-
-		final BufferBuilder bufferBuilder2 = createBufferBuilder(segmentSize);
-		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD,
-			serializer.copyToBufferBuilder(bufferBuilder2));
-	}
-
-	@Test
-	public void testIntRecordsSpanningMultipleSegments() throws Exception {
-		final int segmentSize = 1;
-		final int numValues = 10;
-
-		test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
-	}
-
-	@Test
-	public void testIntRecordsWithAlignedSegments() throws Exception {
-		final int segmentSize = 64;
-		final int numValues = 64;
-
-		test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
-	}
-
-	@Test
-	public void testIntRecordsWithUnalignedSegments() throws Exception {
-		final int segmentSize = 31;
-		final int numValues = 248; // least common multiple => last record should align
-
-		test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
-	}
-
-	@Test
-	public void testRandomRecords() throws Exception {
-		final int segmentSize = 127;
-		final int numValues = 100000;
-
-		test(Util.randomRecords(numValues), segmentSize);
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Iterates over the provided records and tests whether the {@link SpanningRecordSerializer} returns the expected
-	 * {@link RecordSerializer.SerializationResult} values.
-	 *
-	 * <p>Only a single {@link MemorySegment} will be allocated.
-	 *
-	 * @param records records to test
-	 * @param segmentSize size for the {@link MemorySegment}
-	 */
-	private void test(Util.MockRecords records, int segmentSize) throws Exception {
-		final int serializationOverhead = 4; // length encoding
-
-		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
-
-		// -------------------------------------------------------------------------------------------------------------
-
-		BufferBuilder bufferBuilder = createBufferBuilder(segmentSize);
-		int numBytes = 0;
-		for (SerializationTestType record : records) {
-			serializer.serializeRecord(record);
-			RecordSerializer.SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
-			numBytes += record.length() + serializationOverhead;
-
-			if (numBytes < segmentSize) {
-				Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
-			} else if (numBytes == segmentSize) {
-				Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result);
-				bufferBuilder = createBufferBuilder(segmentSize);
-				numBytes = 0;
-			} else {
-				Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
-
-				while (result.isFullBuffer()) {
-					numBytes -= segmentSize;
-					bufferBuilder = createBufferBuilder(segmentSize);
-					result = serializer.copyToBufferBuilder(bufferBuilder);
-				}
-
-				Assert.assertTrue(result.isFullRecord());
-			}
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 84aa17e..66eb8f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -18,79 +18,45 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
 
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
-import java.util.ArrayDeque;
+import java.nio.ByteBuffer;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * {@link ResultPartitionWriter} that collects output on the List.
  */
 @ThreadSafe
 public abstract class AbstractCollectingResultPartitionWriter extends MockResultPartitionWriter {
-	private final BufferProvider bufferProvider;
-	private final ArrayDeque<BufferConsumer> bufferConsumers = new ArrayDeque<>();
-
-	public AbstractCollectingResultPartitionWriter(BufferProvider bufferProvider) {
-		this.bufferProvider = checkNotNull(bufferProvider);
-	}
-
-	@Override
-	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		return bufferProvider.requestBufferBuilderBlocking(targetChannel);
-	}
 
 	@Override
-	public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-		return bufferProvider.requestBufferBuilder(targetChannel);
+	public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
+		checkArgument(targetSubpartition < getNumberOfSubpartitions());
+		deserializeRecord(record);
 	}
 
 	@Override
-	public synchronized boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
-		checkState(targetChannel < getNumberOfSubpartitions());
-		bufferConsumers.add(bufferConsumer);
-		processBufferConsumers();
-		return true;
+	public void broadcastRecord(ByteBuffer record) throws IOException {
+		deserializeRecord(record);
 	}
 
-	private void processBufferConsumers() throws IOException {
-		while (!bufferConsumers.isEmpty()) {
-			BufferConsumer bufferConsumer = bufferConsumers.peek();
-			Buffer buffer = bufferConsumer.build();
-			try {
-				deserializeBuffer(buffer);
-				if (!bufferConsumer.isFinished()) {
-					break;
-				}
-				bufferConsumers.pop().close();
-			}
-			finally {
-				buffer.recycleBuffer();
-			}
-		}
-	}
+	private void deserializeRecord(ByteBuffer serializedRecord) throws IOException {
+		checkArgument(serializedRecord.hasArray());
 
-	@Override
-	public synchronized void flushAll() {
-		try {
-			processBufferConsumers();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	@Override
-	public void flush(int subpartitionIndex) {
-		flushAll();
+		MemorySegment segment = MemorySegmentFactory.wrap(serializedRecord.array());
+		NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE);
+		buffer.setSize(serializedRecord.remaining());
+		deserializeBuffer(buffer);
+		buffer.recycleBuffer();
 	}
 
 	protected abstract void deserializeBuffer(Buffer buffer) throws IOException;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java
index dccfd1d..d318961 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java
@@ -22,8 +22,10 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.testutils.serialization.types.IntType;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
@@ -31,12 +33,12 @@ import org.apache.flink.testutils.serialization.types.Util;
 
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 
 import static org.junit.Assert.assertEquals;
 
@@ -56,19 +58,12 @@ public class BroadcastRecordWriterTest extends RecordWriterTest {
 	 */
 	@Test
 	public void testBroadcastMixedRandomEmitRecord() throws Exception {
-		final int numberOfChannels = 4;
+		final int numberOfChannels = 8;
 		final int numberOfRecords = 8;
 		final int bufferSize = 32;
 
-		@SuppressWarnings("unchecked")
-		final Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
-		for (int i = 0; i < numberOfChannels; i++) {
-			queues[i] = new ArrayDeque<>();
-		}
-
-		final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-		final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
-		final BroadcastRecordWriter<SerializationTestType> writer = new BroadcastRecordWriter<>(partitionWriter, 0, "test");
+		final ResultPartition partition = createResultPartition(bufferSize, numberOfChannels);
+		final BroadcastRecordWriter<SerializationTestType> writer = new BroadcastRecordWriter<>(partition, -1, "test");
 		final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
 			new String[]{ tempFolder.getRoot().getAbsolutePath() });
 
@@ -84,7 +79,7 @@ public class BroadcastRecordWriterTest extends RecordWriterTest {
 		int index = 0;
 		for (SerializationTestType record : records) {
 			int randomChannel = index++ % numberOfChannels;
-			writer.randomEmit(record, randomChannel);
+			writer.emit(record, randomChannel);
 			serializedRecords.get(randomChannel).add(record);
 
 			writer.broadcastEmit(record);
@@ -93,23 +88,23 @@ public class BroadcastRecordWriterTest extends RecordWriterTest {
 			}
 		}
 
-		final int numberOfCreatedBuffers = bufferProvider.getNumberOfCreatedBuffers();
+		final int numberOfCreatedBuffers = partition.getBufferPool().bestEffortGetNumOfUsedBuffers();
 		// verify the expected number of requested buffers, and it would always request a new buffer while random emitting
-		assertEquals(numberOfRecords, numberOfCreatedBuffers);
+		assertEquals(2 * numberOfRecords, numberOfCreatedBuffers);
 
 		for (int i = 0; i < numberOfChannels; i++) {
 			// every channel would queue the number of above crated buffers
-			assertEquals(numberOfRecords, queues[i].size());
+			assertEquals(numberOfRecords + 1, partition.getNumberOfQueuedBuffers(i));
 
 			final int excessRandomRecords = i < numberOfRecords % numberOfChannels ? 1 : 0;
 			final int numberOfRandomRecords = numberOfRecords / numberOfChannels + excessRandomRecords;
 			final int numberOfTotalRecords = numberOfRecords + numberOfRandomRecords;
 			// verify the data correctness in every channel queue
 			verifyDeserializationResults(
-				queues[i],
+				partition.createSubpartitionView(i, new NoOpBufferAvailablityListener()),
 				deserializer,
 				serializedRecords.get(i),
-				numberOfCreatedBuffers,
+				numberOfRecords + 1,
 				numberOfTotalRecords);
 		}
 	}
@@ -121,45 +116,41 @@ public class BroadcastRecordWriterTest extends RecordWriterTest {
 	@Test
 	public void testRandomEmitAndBufferRecycling() throws Exception {
 		int recordSize = 8;
+		int numberOfChannels = 2;
 
-		final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(2, 2 * recordSize);
-		final KeepingPartitionWriter partitionWriter = new KeepingPartitionWriter(bufferProvider) {
-			@Override
-			public int getNumberOfSubpartitions() {
-				return 2;
-			}
-		};
-		final BroadcastRecordWriter<SerializationTestType> writer = new BroadcastRecordWriter<>(partitionWriter, 0, "test");
+		ResultPartition partition = createResultPartition(2 * recordSize, numberOfChannels);
+		BufferPool bufferPool = partition.getBufferPool();
+		BroadcastRecordWriter<SerializationTestType> writer = new BroadcastRecordWriter<>(partition, -1, "test");
 
 		// force materialization of both buffers for easier availability tests
-		List<Buffer> buffers = Arrays.asList(bufferProvider.requestBuffer(), bufferProvider.requestBuffer());
+		List<Buffer> buffers = Arrays.asList(bufferPool.requestBuffer(), bufferPool.requestBuffer());
 		buffers.forEach(Buffer::recycleBuffer);
-		assertEquals(2, bufferProvider.getNumberOfAvailableBuffers());
+		assertEquals(2, bufferPool.getNumberOfAvailableMemorySegments());
 
 		// fill first buffer
-		writer.randomEmit(new IntType(1), 0);
+		writer.broadcastEmit(new IntType(1));
 		writer.broadcastEmit(new IntType(2));
-		assertEquals(1, bufferProvider.getNumberOfAvailableBuffers());
+		assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
 
 		// simulate consumption of first buffer consumer; this should not free buffers
-		assertEquals(1, partitionWriter.getAddedBufferConsumers(0).size());
-		closeConsumer(partitionWriter, 0, 2 * recordSize);
-		assertEquals(1, bufferProvider.getNumberOfAvailableBuffers());
+		assertEquals(1, partition.getNumberOfQueuedBuffers(0));
+		ResultSubpartitionView view0 = partition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
+		closeConsumer(view0, 2 * recordSize);
+		assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
 
 		// use second buffer
-		writer.broadcastEmit(new IntType(3));
-		assertEquals(0, bufferProvider.getNumberOfAvailableBuffers());
+		writer.emit(new IntType(3), 0);
+		assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
 
 		// fully free first buffer
-		assertEquals(2, partitionWriter.getAddedBufferConsumers(1).size());
-		closeConsumer(partitionWriter, 1, recordSize);
-		assertEquals(1, bufferProvider.getNumberOfAvailableBuffers());
+		assertEquals(1, partition.getNumberOfQueuedBuffers(1));
+		ResultSubpartitionView view1 = partition.createSubpartitionView(1, new NoOpBufferAvailablityListener());
+		closeConsumer(view1, 2 * recordSize);
+		assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
 	}
 
-	public void closeConsumer(KeepingPartitionWriter partitionWriter, int subpartitionIndex, int expectedSize) {
-		BufferConsumer bufferConsumer = partitionWriter.getAddedBufferConsumers(subpartitionIndex).get(0);
-		Buffer buffer = bufferConsumer.build();
-		bufferConsumer.close();
+	public void closeConsumer(ResultSubpartitionView view, int expectedSize) throws IOException {
+		Buffer buffer = view.getNextBuffer().buffer();
 		assertEquals(expectedSize, buffer.getSize());
 		buffer.recycleBuffer();
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
index 1285c4e..4a9f7ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.api.writer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.types.Record;
 
 import java.io.IOException;
@@ -39,8 +38,7 @@ public class RecordCollectingResultPartitionWriter extends AbstractCollectingRes
 	private final RecordDeserializer<Record> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
 		new String[]{System.getProperty("java.io.tmpdir")});
 
-	public RecordCollectingResultPartitionWriter(List<Record> output, BufferProvider bufferProvider) {
-		super(bufferProvider);
+	public RecordCollectingResultPartitionWriter(List<Record> output) {
 		this.output = checkNotNull(output);
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
index 3d3073b..5540a53 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
@@ -20,11 +20,9 @@ package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 
@@ -44,35 +42,32 @@ public class RecordOrEventCollectingResultPartitionWriter<T> extends AbstractCol
 
 	public RecordOrEventCollectingResultPartitionWriter(
 			Collection<Object> output,
-			BufferProvider bufferProvider,
 			TypeSerializer<T> serializer) {
-		super(bufferProvider);
 		this.output = checkNotNull(output);
 		this.delegate = new NonReusingDeserializationDelegate<>(checkNotNull(serializer));
 	}
 
 	@Override
+	public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
+		output.add(event);
+	}
+
+	@Override
 	protected void deserializeBuffer(Buffer buffer) throws IOException {
-		if (buffer.isBuffer()) {
-			deserializer.setNextBuffer(buffer);
+		deserializer.setNextBuffer(buffer);
 
-			while (deserializer.hasUnfinishedData()) {
-				RecordDeserializer.DeserializationResult result =
-					deserializer.getNextRecord(delegate);
+		while (deserializer.hasUnfinishedData()) {
+			RecordDeserializer.DeserializationResult result =
+				deserializer.getNextRecord(delegate);
 
-				if (result.isFullRecord()) {
-					output.add(delegate.getInstance());
-				}
+			if (result.isFullRecord()) {
+				output.add(delegate.getInstance());
+			}
 
-				if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
-					|| result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
-					break;
-				}
+			if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
+				|| result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
+				break;
 			}
-		} else {
-			// is event
-			AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
-			output.add(event);
 		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
index adb059c..5a0ea8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
@@ -20,28 +20,25 @@ package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
+import org.apache.flink.types.IntValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -51,16 +48,17 @@ import static org.junit.Assert.assertTrue;
  */
 public class RecordWriterDelegateTest extends TestLogger {
 
+	private static final int recordSize = 8;
+
 	private static final int numberOfBuffers = 10;
 
 	private static final int memorySegmentSize = 128;
 
-	private static final int numberOfSegmentsToRequest = 2;
-
 	private NetworkBufferPool globalPool;
 
 	@Before
 	public void setup() {
+		assertEquals("Illegal memory segment size,", 0, memorySegmentSize % recordSize);
 		globalPool = new NetworkBufferPool(numberOfBuffers, memorySegmentSize);
 	}
 
@@ -103,11 +101,11 @@ public class RecordWriterDelegateTest extends TestLogger {
 	@SuppressWarnings("unchecked")
 	public void testSingleRecordWriterBroadcastEvent() throws Exception {
 		// setup
-		final ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[] { new ArrayDeque(), new ArrayDeque() };
-		final RecordWriter recordWriter = createRecordWriter(queues);
+		final ResultPartition partition = RecordWriterTest.createResultPartition(memorySegmentSize, 2);
+		final RecordWriter recordWriter = new RecordWriterBuilder<>().build(partition);
 		final RecordWriterDelegate writerDelegate = new SingleRecordWriter(recordWriter);
 
-		verifyBroadcastEvent(writerDelegate, queues, 1);
+		verifyBroadcastEvent(writerDelegate, Collections.singletonList(partition));
 	}
 
 	@Test
@@ -116,14 +114,16 @@ public class RecordWriterDelegateTest extends TestLogger {
 		// setup
 		final int numRecordWriters = 2;
 		final List<RecordWriter> recordWriters = new ArrayList<>(numRecordWriters);
-		final ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[] { new ArrayDeque(), new ArrayDeque() };
+		final List<ResultPartition> partitions = new ArrayList<>(numRecordWriters);
 
 		for (int i = 0; i < numRecordWriters; i++) {
-			recordWriters.add(createRecordWriter(queues));
+			final ResultPartition partition = RecordWriterTest.createResultPartition(memorySegmentSize, 2);
+			partitions.add(partition);
+			recordWriters.add(new RecordWriterBuilder<>().build(partition));
 		}
 		final RecordWriterDelegate writerDelegate = new MultipleRecordWriters(recordWriters);
 
-		verifyBroadcastEvent(writerDelegate, queues, numRecordWriters);
+		verifyBroadcastEvent(writerDelegate, partitions);
 	}
 
 	private RecordWriter createRecordWriter(NetworkBufferPool globalPool) throws Exception {
@@ -136,14 +136,6 @@ public class RecordWriterDelegateTest extends TestLogger {
 		return new RecordWriterBuilder().build(partition);
 	}
 
-	private RecordWriter createRecordWriter(ArrayDeque<BufferConsumer>[] queues) {
-		final ResultPartitionWriter partition = new RecordWriterTest.CollectingPartitionWriter(
-			queues,
-			new TestPooledBufferProvider(1));
-
-		return new RecordWriterBuilder().build(partition);
-	}
-
 	private void verifyAvailability(RecordWriterDelegate writerDelegate) throws Exception {
 		// writer is available at the beginning
 		assertTrue(writerDelegate.isAvailable());
@@ -151,13 +143,14 @@ public class RecordWriterDelegateTest extends TestLogger {
 
 		// request one buffer from the local pool to make it unavailable
 		RecordWriter recordWriter = writerDelegate.getRecordWriter(0);
-		final BufferBuilder bufferBuilder = checkNotNull(recordWriter.getBufferBuilder(0));
+		for (int i = 0; i < memorySegmentSize / recordSize; ++i) {
+			recordWriter.emit(new IntValue(i));
+		}
 		assertFalse(writerDelegate.isAvailable());
 		CompletableFuture future = writerDelegate.getAvailableFuture();
 		assertFalse(future.isDone());
 
 		// recycle the buffer to make the local pool available again
-		BufferBuilderTestUtils.fillBufferBuilder(bufferBuilder, 1).finish();
 		ResultSubpartitionView readView = recordWriter.getTargetPartition().createSubpartitionView(0, new NoOpBufferAvailablityListener());
 		Buffer buffer = readView.getNextBuffer().buffer();
 
@@ -169,18 +162,18 @@ public class RecordWriterDelegateTest extends TestLogger {
 
 	private void verifyBroadcastEvent(
 			RecordWriterDelegate writerDelegate,
-			ArrayDeque<BufferConsumer>[] queues,
-			int numRecordWriters) throws Exception {
+			List<ResultPartition> partitions) throws Exception {
 
 		final CancelCheckpointMarker message = new CancelCheckpointMarker(1);
 		writerDelegate.broadcastEvent(message);
 
 		// verify the added messages in all the queues
-		for (int i = 0; i < queues.length; i++) {
-			assertEquals(numRecordWriters, queues[i].size());
+		for (ResultPartition partition : partitions) {
+			for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) {
+				assertEquals(1, partition.getNumberOfQueuedBuffers(i));
 
-			for (int j = 0; j < numRecordWriters; j++) {
-				BufferOrEvent boe = RecordWriterTest.parseBuffer(queues[i].remove(), i);
+				ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
+				BufferOrEvent boe = RecordWriterTest.parseBuffer(view.getNextBuffer().buffer(), i);
 				assertTrue(boe.isEvent());
 				assertEquals(message, boe.getEvent());
 			}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 6cad1d3..a0956ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -27,34 +27,33 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
 import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
 import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionTest;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.DeserializationUtils;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.operators.shipping.OutputEmitter;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
@@ -65,7 +64,6 @@ import org.apache.flink.testutils.serialization.types.Util;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.XORShiftRandom;
 
-import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -74,19 +72,13 @@ import org.junit.rules.TemporaryFolder;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Queue;
 import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
 
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
+import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -119,35 +111,27 @@ public class RecordWriterTest {
 		int numberOfChannels = 4;
 		int bufferSize = 32;
 
-		@SuppressWarnings("unchecked")
-		Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
-		for (int i = 0; i < numberOfChannels; i++) {
-			queues[i] = new ArrayDeque<>();
-		}
-
-		TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-
-		ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
-		RecordWriter<ByteArrayIO> writer = createRecordWriter(partitionWriter);
+		ResultPartition partition = createResultPartition(bufferSize, numberOfChannels);
+		RecordWriter<ByteArrayIO> writer = createRecordWriter(partition);
 		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forCheckpointWithDefaultLocation());
 
 		// No records emitted yet, broadcast should not request a buffer
 		writer.broadcastEvent(barrier);
 
-		assertEquals(0, bufferProvider.getNumberOfCreatedBuffers());
+		assertEquals(0, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
 
 		for (int i = 0; i < numberOfChannels; i++) {
-			assertEquals(1, queues[i].size());
-			BufferOrEvent boe = parseBuffer(queues[i].remove(), i);
+			assertEquals(1, partition.getNumberOfQueuedBuffers(i));
+			ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
+			BufferOrEvent boe = parseBuffer(view.getNextBuffer().buffer(), i);
 			assertTrue(boe.isEvent());
 			assertEquals(barrier, boe.getEvent());
-			assertEquals(0, queues[i].size());
+			assertFalse(view.isAvailable(Integer.MAX_VALUE));
 		}
 	}
 
 	/**
-	 * Tests broadcasting events when records have been emitted. The emitted
-	 * records cover all three {@link SerializationResult} types.
+	 * Tests broadcasting events when records have been emitted.
 	 */
 	@Test
 	public void testBroadcastEventMixedRecords() throws Exception {
@@ -156,16 +140,8 @@ public class RecordWriterTest {
 		int bufferSize = 32;
 		int lenBytes = 4; // serialized length
 
-		@SuppressWarnings("unchecked")
-		Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
-		for (int i = 0; i < numberOfChannels; i++) {
-			queues[i] = new ArrayDeque<>();
-		}
-
-		TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-
-		ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
-		RecordWriter<ByteArrayIO> writer = createRecordWriter(partitionWriter);
+		ResultPartition partition = createResultPartition(bufferSize, numberOfChannels);
+		RecordWriter<ByteArrayIO> writer = createRecordWriter(partition);
 		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forCheckpointWithDefaultLocation());
 
 		// Emit records on some channels first (requesting buffers), then
@@ -194,34 +170,43 @@ public class RecordWriterTest {
 		writer.broadcastEvent(barrier);
 
 		if (isBroadcastWriter) {
-			assertEquals(3, bufferProvider.getNumberOfCreatedBuffers());
+			assertEquals(3, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
 
 			for (int i = 0; i < numberOfChannels; i++) {
-				assertEquals(4, queues[i].size()); // 3 buffer + 1 event
+				assertEquals(4, partition.getNumberOfQueuedBuffers(i)); // 3 buffer + 1 event
 
+				ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
 				for (int j = 0; j < 3; j++) {
-					assertTrue(parseBuffer(queues[i].remove(), 0).isBuffer());
+					assertTrue(parseBuffer(view.getNextBuffer().buffer(), 0).isBuffer());
 				}
 
-				BufferOrEvent boe = parseBuffer(queues[i].remove(), i);
+				BufferOrEvent boe = parseBuffer(view.getNextBuffer().buffer(), i);
 				assertTrue(boe.isEvent());
 				assertEquals(barrier, boe.getEvent());
 			}
 		} else {
-			assertEquals(4, bufferProvider.getNumberOfCreatedBuffers());
+			assertEquals(4, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
+			ResultSubpartitionView[] views = new ResultSubpartitionView[4];
+
+			assertEquals(2, partition.getNumberOfQueuedBuffers(0)); // 1 buffer + 1 event
+			views[0] = partition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
+			assertTrue(parseBuffer(views[0].getNextBuffer().buffer(), 0).isBuffer());
 
-			assertEquals(2, queues[0].size()); // 1 buffer + 1 event
-			assertTrue(parseBuffer(queues[0].remove(), 0).isBuffer());
-			assertEquals(3, queues[1].size()); // 2 buffers + 1 event
-			assertTrue(parseBuffer(queues[1].remove(), 1).isBuffer());
-			assertTrue(parseBuffer(queues[1].remove(), 1).isBuffer());
-			assertEquals(2, queues[2].size()); // 1 buffer + 1 event
-			assertTrue(parseBuffer(queues[2].remove(), 2).isBuffer());
-			assertEquals(1, queues[3].size()); // 0 buffers + 1 event
+			assertEquals(3, partition.getNumberOfQueuedBuffers(1)); // 2 buffers + 1 event
+			views[1] = partition.createSubpartitionView(1, new NoOpBufferAvailablityListener());
+			assertTrue(parseBuffer(views[1].getNextBuffer().buffer(), 1).isBuffer());
+			assertTrue(parseBuffer(views[1].getNextBuffer().buffer(), 1).isBuffer());
+
+			assertEquals(2, partition.getNumberOfQueuedBuffers(2)); // 1 buffer + 1 event
+			views[2] = partition.createSubpartitionView(2, new NoOpBufferAvailablityListener());
+			assertTrue(parseBuffer(views[2].getNextBuffer().buffer(), 2).isBuffer());
+
+			views[3] = partition.createSubpartitionView(3, new NoOpBufferAvailablityListener());
+			assertEquals(1, partition.getNumberOfQueuedBuffers(3)); // 0 buffers + 1 event
 
 			// every queue's last element should be the event
 			for (int i = 0; i < numberOfChannels; i++) {
-				BufferOrEvent boe = parseBuffer(queues[i].remove(), i);
+				BufferOrEvent boe = parseBuffer(views[i].getNextBuffer().buffer(), i);
 				assertTrue(boe.isEvent());
 				assertEquals(barrier, boe.getEvent());
 			}
@@ -234,31 +219,28 @@ public class RecordWriterTest {
 	 */
 	@Test
 	public void testBroadcastEventBufferReferenceCounting() throws Exception {
+		int bufferSize = 32 * 1024;
+		int numSubpartitions = 2;
 
-		@SuppressWarnings("unchecked")
-		ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[] { new ArrayDeque(), new ArrayDeque() };
-
-		ResultPartitionWriter partition =
-			new CollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE));
+		ResultPartition partition = createResultPartition(bufferSize, numSubpartitions);
 		RecordWriter<?> writer = createRecordWriter(partition);
 
 		writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
 
-		// Verify added to all queues
-		assertEquals(1, queues[0].size());
-		assertEquals(1, queues[1].size());
-
 		// get references to buffer consumers (copies from the original event buffer consumer)
-		BufferConsumer bufferConsumer1 = queues[0].getFirst();
-		BufferConsumer bufferConsumer2 = queues[1].getFirst();
+		Buffer[] buffers = new Buffer[numSubpartitions];
 
 		// process all collected events (recycles the buffer)
-		for (int i = 0; i < queues.length; i++) {
-			assertTrue(parseBuffer(queues[i].remove(), i).isEvent());
+		for (int i = 0; i < numSubpartitions; i++) {
+			assertEquals(1, partition.getNumberOfQueuedBuffers(i));
+			ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
+			buffers[i] = view.getNextBuffer().buffer();
+			assertTrue(parseBuffer(buffers[i], i).isEvent());
 		}
 
-		assertTrue(bufferConsumer1.isRecycled());
-		assertTrue(bufferConsumer2.isRecycled());
+		for (int i = 0; i < numSubpartitions; ++i) {
+			assertTrue(buffers[i].isRecycled());
+		}
 	}
 
 	/**
@@ -289,15 +271,8 @@ public class RecordWriterTest {
 		final int numValues = 8;
 		final int serializationLength = 4;
 
-		@SuppressWarnings("unchecked")
-		final Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
-		for (int i = 0; i < numberOfChannels; i++) {
-			queues[i] = new ArrayDeque<>();
-		}
-
-		final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-		final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
-		final RecordWriter<SerializationTestType> writer = createRecordWriter(partitionWriter);
+		final ResultPartition partition = createResultPartition(bufferSize, numberOfChannels);
+		final RecordWriter<SerializationTestType> writer = createRecordWriter(partition);
 		final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
 			new String[]{ tempFolder.getRoot().getAbsolutePath() });
 
@@ -310,14 +285,15 @@ public class RecordWriterTest {
 
 		final int numRequiredBuffers = numValues / (bufferSize / (4 + serializationLength));
 		if (isBroadcastWriter) {
-			assertEquals(numRequiredBuffers, bufferProvider.getNumberOfCreatedBuffers());
+			assertEquals(numRequiredBuffers, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
 		} else {
-			assertEquals(numRequiredBuffers * numberOfChannels, bufferProvider.getNumberOfCreatedBuffers());
+			assertEquals(numRequiredBuffers * numberOfChannels, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
 		}
 
 		for (int i = 0; i < numberOfChannels; i++) {
-			assertEquals(numRequiredBuffers, queues[i].size());
-			verifyDeserializationResults(queues[i], deserializer, serializedRecords.clone(), numRequiredBuffers, numValues);
+			assertEquals(numRequiredBuffers, partition.getNumberOfQueuedBuffers(i));
+			ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
+			verifyDeserializationResults(view, deserializer, serializedRecords.clone(), numRequiredBuffers, numValues);
 		}
 	}
 
@@ -345,7 +321,7 @@ public class RecordWriterTest {
 			assertTrue(recordWriter.getAvailableFuture().isDone());
 
 			// request one buffer from the local pool to make it unavailable afterwards
-			final BufferBuilder bufferBuilder = resultPartition.getBufferBuilder(0);
+			final BufferBuilder bufferBuilder = localPool.requestBufferBuilder(0);
 			assertNotNull(bufferBuilder);
 			assertFalse(recordWriter.getAvailableFuture().isDone());
 
@@ -418,64 +394,8 @@ public class RecordWriterTest {
 		}
 	}
 
-	@Test
-	public void testIdleTime() throws IOException, InterruptedException {
-		// setup
-		final NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
-		final BufferPool localPool = globalPool.createBufferPool(1, 1, null, 1, Integer.MAX_VALUE);
-		final ResultPartitionWriter resultPartition = new ResultPartitionBuilder()
-			.setBufferPoolFactory(p -> localPool)
-			.build();
-		resultPartition.setup();
-		final ResultPartitionWriter partitionWrapper = new ConsumableNotifyingResultPartitionWriterDecorator(
-			new NoOpTaskActions(),
-			new JobID(),
-			resultPartition,
-			new NoOpResultPartitionConsumableNotifier());
-		final RecordWriter recordWriter = createRecordWriter(partitionWrapper);
-		BufferBuilder builder = recordWriter.requestNewBufferBuilder(0);
-		BufferBuilderTestUtils.fillBufferBuilder(builder, 1).finish();
-		ResultSubpartitionView readView = resultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
-		Buffer buffer = readView.getNextBuffer().buffer();
-
-		// idle time is zero when there is buffer available.
-		assertEquals(0, recordWriter.getIdleTimeMsPerSecond().getCount());
-
-		CountDownLatch syncLock = new CountDownLatch(1);
-		AtomicReference<BufferBuilder> asyncRequestResult = new AtomicReference<>();
-		final Thread requestThread = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				try {
-					// notify that the request thread start to run.
-					syncLock.countDown();
-					// wait for buffer.
-					asyncRequestResult.set(recordWriter.requestNewBufferBuilder(0));
-				} catch (Exception e) {
-				}
-			}
-		});
-		requestThread.start();
-
-		// wait until request thread start to run.
-		syncLock.await();
-
-		Thread.sleep(10);
-
-		//recycle the buffer
-		buffer.recycleBuffer();
-		requestThread.join();
-
-		assertThat(recordWriter.getIdleTimeMsPerSecond().getCount(), Matchers.greaterThan(0L));
-		assertNotNull(asyncRequestResult.get());
-	}
-
 	private void verifyBroadcastBufferOrEventIndependence(boolean broadcastEvent) throws Exception {
-		@SuppressWarnings("unchecked")
-		ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[]{new ArrayDeque(), new ArrayDeque()};
-
-		ResultPartitionWriter partition =
-			new CollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE));
+		ResultPartition partition = createResultPartition(4096, 2);
 		RecordWriter<IntValue> writer = createRecordWriter(partition);
 
 		if (broadcastEvent) {
@@ -485,12 +405,15 @@ public class RecordWriterTest {
 		}
 
 		// verify added to all queues
-		assertEquals(1, queues[0].size());
-		assertEquals(1, queues[1].size());
+		assertEquals(1, partition.getNumberOfQueuedBuffers(0));
+		assertEquals(1, partition.getNumberOfQueuedBuffers(1));
+
+		ResultSubpartitionView view0 = partition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
+		ResultSubpartitionView view1 = partition.createSubpartitionView(1, new NoOpBufferAvailablityListener());
 
 		// these two buffers may share the memory but not the indices!
-		Buffer buffer1 = buildSingleBuffer(queues[0].remove());
-		Buffer buffer2 = buildSingleBuffer(queues[1].remove());
+		Buffer buffer1 = view0.getNextBuffer().buffer();
+		Buffer buffer2 = view1.getNextBuffer().buffer();
 		assertEquals(0, buffer1.getReaderIndex());
 		assertEquals(0, buffer2.getReaderIndex());
 		buffer1.setReaderIndex(1);
@@ -498,18 +421,19 @@ public class RecordWriterTest {
 	}
 
 	protected void verifyDeserializationResults(
-			Queue<BufferConsumer> queue,
+			ResultSubpartitionView view,
 			RecordDeserializer<SerializationTestType> deserializer,
 			ArrayDeque<SerializationTestType> expectedRecords,
 			int numRequiredBuffers,
 			int numValues) throws Exception {
 		int assertRecords = 0;
 		for (int j = 0; j < numRequiredBuffers; j++) {
-			Buffer buffer = buildSingleBuffer(queue.remove());
+			Buffer buffer = view.getNextBuffer().buffer();
 			deserializer.setNextBuffer(buffer);
 
 			assertRecords += DeserializationUtils.deserializeRecords(expectedRecords, deserializer);
 		}
+		assertFalse(view.isAvailable(Integer.MAX_VALUE));
 		Assert.assertEquals(numValues, assertRecords);
 	}
 
@@ -526,51 +450,18 @@ public class RecordWriterTest {
 		}
 	}
 
+	public static ResultPartition createResultPartition(int bufferSize, int numSubpartitions) throws IOException {
+		NettyShuffleEnvironment env = new NettyShuffleEnvironmentBuilder().setBufferSize(bufferSize).build();
+		ResultPartition partition = createPartition(env, ResultPartitionType.PIPELINED, numSubpartitions);
+		partition.setup();
+		return partition;
+	}
+
 	// ---------------------------------------------------------------------------------------------
 	// Helpers
 	// ---------------------------------------------------------------------------------------------
 
-	/**
-	 * Partition writer that collects the added buffers/events in multiple queue.
-	 */
-	static class CollectingPartitionWriter extends MockResultPartitionWriter {
-		private final Queue<BufferConsumer>[] queues;
-		private final BufferProvider bufferProvider;
-
-		/**
-		 * Create the partition writer.
-		 *
-		 * @param queues one queue per outgoing channel
-		 * @param bufferProvider buffer provider
-		 */
-		CollectingPartitionWriter(Queue<BufferConsumer>[] queues, BufferProvider bufferProvider) {
-			this.queues = queues;
-			this.bufferProvider = bufferProvider;
-		}
-
-		@Override
-		public int getNumberOfSubpartitions() {
-			return queues.length;
-		}
-
-		@Override
-		public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-			return bufferProvider.requestBufferBuilderBlocking(targetChannel);
-		}
-
-		@Override
-		public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-			return bufferProvider.requestBufferBuilder(targetChannel);
-		}
-
-		@Override
-		public boolean addBufferConsumer(BufferConsumer buffer, int targetChannel) {
-			return queues[targetChannel].add(buffer);
-		}
-	}
-
-	static BufferOrEvent parseBuffer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
-		Buffer buffer = buildSingleBuffer(bufferConsumer);
+	static BufferOrEvent parseBuffer(Buffer buffer, int targetChannel) throws IOException {
 		if (buffer.isBuffer()) {
 			return new BufferOrEvent(buffer, new InputChannelInfo(0, targetChannel));
 		} else {
@@ -581,68 +472,6 @@ public class RecordWriterTest {
 		}
 	}
 
-	/**
-	 * Partition writer that recycles all received buffers and does no further processing.
-	 */
-	private static class RecyclingPartitionWriter extends MockResultPartitionWriter {
-		private final BufferProvider bufferProvider;
-
-		private RecyclingPartitionWriter(BufferProvider bufferProvider) {
-			this.bufferProvider = bufferProvider;
-		}
-
-		@Override
-		public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-			return bufferProvider.requestBufferBuilderBlocking(targetChannel);
-		}
-
-		@Override
-		public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-			return bufferProvider.requestBufferBuilder(targetChannel);
-		}
-	}
-
-	static class KeepingPartitionWriter extends MockResultPartitionWriter {
-		private final BufferProvider bufferProvider;
-		private Map<Integer, List<BufferConsumer>> produced = new HashMap<>();
-
-		KeepingPartitionWriter(BufferProvider bufferProvider) {
-			this.bufferProvider = bufferProvider;
-		}
-
-		@Override
-		public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-			return bufferProvider.requestBufferBuilderBlocking(targetChannel);
-		}
-
-		@Override
-		public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-			return bufferProvider.requestBufferBuilder(targetChannel);
-		}
-
-		@Override
-		public boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) {
-			// keep the buffer occupied.
-			produced.putIfAbsent(targetChannel, new ArrayList<>());
-			produced.get(targetChannel).add(bufferConsumer);
-			return true;
-		}
-
-		public List<BufferConsumer> getAddedBufferConsumers(int subpartitionIndex) {
-			return produced.get(subpartitionIndex);
-		}
-
-		@Override
-		public void close() {
-			for (List<BufferConsumer> bufferConsumers : produced.values()) {
-				for (BufferConsumer bufferConsumer : bufferConsumers) {
-					bufferConsumer.close();
-				}
-			}
-			produced.clear();
-		}
-	}
-
 	private static class ByteArrayIO implements IOReadableWritable {
 
 		private final byte[] bytes;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index 3b6a2f5..f9616bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -21,22 +21,21 @@ package org.apache.flink.runtime.io.network.netty;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
 import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
-import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
 import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest;
 import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
@@ -55,9 +54,11 @@ import org.junit.rules.TemporaryFolder;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
+import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
@@ -478,14 +479,12 @@ public class PartitionRequestQueueTest {
 	}
 
 	private static ResultPartition createFinishedPartitionWithFilledData(ResultPartitionManager partitionManager) throws Exception {
-		final ResultPartition partition = new ResultPartitionBuilder()
-			.setResultPartitionType(ResultPartitionType.BLOCKING)
-			.setFileChannelManager(fileChannelManager)
-			.setResultPartitionManager(partitionManager)
-			.build();
-
-		partitionManager.registerResultPartition(partition);
-		PartitionTestUtils.writeBuffers(partition, 1, BUFFER_SIZE);
+		NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().setResultPartitionManager(partitionManager).build();
+		ResultPartition partition = createPartition(environment, fileChannelManager, ResultPartitionType.BLOCKING, 1);
+
+		partition.setup();
+		partition.emitRecord(ByteBuffer.allocate(BUFFER_SIZE), 0);
+		partition.finish();
 
 		return partition;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
index ad21bf4..220225e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -54,14 +55,15 @@ public class MockResultPartitionWriter implements ResultPartitionWriter {
 	}
 
 	@Override
-	public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
-		bufferConsumer.close();
-		return true;
+	public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
 	}
 
 	@Override
-	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		throw new UnsupportedOperationException();
+	public void broadcastRecord(ByteBuffer record) throws IOException {
+	}
+
+	@Override
+	public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
 	}
 
 	@Override
@@ -69,8 +71,8 @@ public class MockResultPartitionWriter implements ResultPartitionWriter {
 		throw new UnsupportedOperationException();
 	}
 
-	public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-		throw new UnsupportedOperationException();
+	@Override
+	public void setMetricGroup(TaskIOMetricGroup metrics) {
 	}
 
 	@Override
@@ -90,6 +92,20 @@ public class MockResultPartitionWriter implements ResultPartitionWriter {
 	}
 
 	@Override
+	public boolean isFinished() {
+		return false;
+	}
+
+	@Override
+	public void release(Throwable cause) {
+	}
+
+	@Override
+	public boolean isReleased() {
+		return false;
+	}
+
+	@Override
 	public CompletableFuture<?> getAvailableFuture() {
 		return AVAILABLE;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 462cc4b..1ee8dd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -39,6 +38,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.ClassRule;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
+
 /**
  * Test for consuming a pipelined result only partially.
  */
@@ -118,10 +119,8 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
 			final ResultPartitionWriter writer = getEnvironment().getWriter(0);
 
 			for (int i = 0; i < 8; i++) {
-				final BufferBuilder bufferBuilder = writer.getBufferBuilder(0);
-				writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
+				writer.emitRecord(ByteBuffer.allocate(1024), 0);
 				Thread.sleep(50);
-				bufferBuilder.finish();
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index a545b73..c6d7819 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
@@ -31,7 +30,6 @@ import org.hamcrest.Matchers;
 
 import java.io.IOException;
 
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -131,14 +129,4 @@ public enum PartitionTestUtils {
 			1,
 			true);
 	}
-
-	public static void writeBuffers(
-			ResultPartitionWriter partition,
-			int numberOfBuffers,
-			int bufferSize) throws IOException {
-		for (int i = 0; i < numberOfBuffers; i++) {
-			partition.addBufferConsumer(createFilledFinishedBufferConsumer(bufferSize), 0);
-		}
-		partition.finish();
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 95aee00..c806a19 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -23,12 +23,9 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
@@ -39,7 +36,6 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -47,7 +43,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer;
-import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -141,41 +136,32 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 	private void testProduceConsume(boolean isSlowProducer, boolean isSlowConsumer) throws Exception {
 		// Config
-		final int producerBufferPoolSize = 8;
 		final int producerNumberOfBuffersToProduce = 128;
+		final int bufferSize = 32 * 1024;
 
 		// Producer behaviour
 		final TestProducerSource producerSource = new TestProducerSource() {
 
-			private BufferProvider bufferProvider = new TestPooledBufferProvider(producerBufferPoolSize);
-
 			private int numberOfBuffers;
 
 			@Override
-			public BufferConsumerAndChannel getNextBufferConsumer() throws Exception {
+			public BufferAndChannel getNextBuffer() throws Exception {
 				if (numberOfBuffers == producerNumberOfBuffersToProduce) {
 					return null;
 				}
 
-				final BufferBuilder bufferBuilder = bufferProvider.requestBufferBuilderBlocking();
-				final BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
-				int segmentSize = bufferBuilder.getMaxCapacity();
-
-				MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(segmentSize);
+				MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
 
-				int next = numberOfBuffers * (segmentSize / Integer.BYTES);
+				int next = numberOfBuffers * (bufferSize / Integer.BYTES);
 
-				for (int i = 0; i < segmentSize; i += 4) {
+				for (int i = 0; i < bufferSize; i += 4) {
 					segment.putInt(i, next);
 					next++;
 				}
 
-				checkState(bufferBuilder.appendAndCommit(ByteBuffer.wrap(segment.getArray())) == segmentSize);
-				bufferBuilder.finish();
-
 				numberOfBuffers++;
 
-				return new BufferConsumerAndChannel(bufferConsumer, 0);
+				return new BufferAndChannel(segment.getArray(), 0);
 			}
 		};
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index e293422..d221e99 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -26,49 +26,46 @@ import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
 import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.concurrent.FutureConsumerWithException;
 
+import org.hamcrest.Matchers;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer;
 import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
 import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.verifyCreateSubpartitionViewThrowsException;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /**
  * Tests for {@link ResultPartition}.
@@ -79,6 +76,8 @@ public class ResultPartitionTest {
 
 	private static FileChannelManager fileChannelManager;
 
+	private final int bufferSize = 1024;
+
 	@BeforeClass
 	public static void setUp() {
 		fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
@@ -115,32 +114,45 @@ public class ResultPartitionTest {
 	 */
 	@Test
 	public void testSendScheduleOrUpdateConsumersMessage() throws Exception {
+		FutureConsumerWithException[] notificationCalls = new FutureConsumerWithException[] {
+			writer -> ((ResultPartitionWriter) writer).finish(),
+			writer -> ((ResultPartitionWriter) writer).emitRecord(ByteBuffer.allocate(bufferSize), 0),
+			writer -> ((ResultPartitionWriter) writer).broadcastEvent(EndOfPartitionEvent.INSTANCE, false),
+			writer -> ((ResultPartitionWriter) writer).broadcastRecord(ByteBuffer.allocate(bufferSize))
+		};
+
+		for (FutureConsumerWithException notificationCall: notificationCalls) {
+			testSendScheduleOrUpdateConsumersMessage(notificationCall);
+		}
+	}
+
+	private void testSendScheduleOrUpdateConsumersMessage(
+			FutureConsumerWithException<ResultPartitionWriter, Exception> notificationCall) throws Exception {
 		JobID jobId = new JobID();
 		TaskActions taskActions = new NoOpTaskActions();
 
 		{
 			// Pipelined, send message => notify
-			ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+			TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
 			ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
 				ResultPartitionType.PIPELINED,
 				taskActions,
 				jobId,
 				notifier);
-			consumableNotifyingPartitionWriter.addBufferConsumer(createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE), 0);
-			verify(notifier, times(1))
-				.notifyPartitionConsumable(eq(jobId), eq(consumableNotifyingPartitionWriter.getPartitionId()), eq(taskActions));
+			notificationCall.accept(consumableNotifyingPartitionWriter);
+			notifier.check(jobId, consumableNotifyingPartitionWriter.getPartitionId(), taskActions, 1);
 		}
 
 		{
 			// Blocking, send message => don't notify
-			ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+			TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
 			ResultPartitionWriter partition = createConsumableNotifyingResultPartitionWriter(
 				ResultPartitionType.BLOCKING,
 				taskActions,
 				jobId,
 				notifier);
-			partition.addBufferConsumer(createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE), 0);
-			verify(notifier, never()).notifyPartitionConsumable(eq(jobId), eq(partition.getPartitionId()), eq(taskActions));
+			notificationCall.accept(partition);
+			notifier.check(null, null, null, 0);
 		}
 	}
 
@@ -181,38 +193,32 @@ public class ResultPartitionTest {
 	}
 
 	/**
-	 * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already finished.
+	 * Tests {@link ResultPartition#emitRecord} on a partition which has already finished.
 	 *
 	 * @param partitionType the result partition type to set up
 	 */
 	private void testAddOnFinishedPartition(final ResultPartitionType partitionType) throws Exception {
-		BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
-		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
-		JobID jobId = new JobID();
-		TaskActions taskActions = new NoOpTaskActions();
-		ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
-			partitionType,
-			taskActions,
-			jobId,
-			notifier);
+		TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
+		BufferWritingResultPartition bufferWritingResultPartition = createResultPartition(partitionType);
+		ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
+			Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
+			new ResultPartitionWriter[]{bufferWritingResultPartition},
+			new NoOpTaskActions(),
+			new JobID(),
+			notifier)[0];
 		try {
-			consumableNotifyingPartitionWriter.finish();
-			reset(notifier);
-			// partition.add() should fail
-			consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
-			Assert.fail("exception expected");
+			partitionWriter.finish();
+			notifier.reset();
+			// partitionWriter.emitRecord() should fail
+			partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
 		} catch (IllegalStateException e) {
 			// expected => ignored
 		} finally {
-			if (!bufferConsumer.isRecycled()) {
-				bufferConsumer.close();
-				Assert.fail("bufferConsumer not recycled");
-			}
+			assertEquals(0, bufferWritingResultPartition.numBuffersOut.getCount());
+			assertEquals(0, bufferWritingResultPartition.numBytesOut.getCount());
+			assertEquals(0, bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
 			// should not have notified either
-			verify(notifier, never()).notifyPartitionConsumable(
-				eq(jobId),
-				eq(consumableNotifyingPartitionWriter.getPartitionId()),
-				eq(taskActions));
+			notifier.check(null, null, null, 0);
 		}
 	}
 
@@ -227,35 +233,30 @@ public class ResultPartitionTest {
 	}
 
 	/**
-	 * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already been released.
+	 * Tests {@link ResultPartition#emitRecord} on a partition which has already been released.
 	 *
 	 * @param partitionType the result partition type to set up
 	 */
-	private void testAddOnReleasedPartition(final ResultPartitionType partitionType) throws Exception {
-		BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
-		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
-		JobID jobId = new JobID();
-		TaskActions taskActions = new NoOpTaskActions();
-		ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
-			createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
-		ResultPartitionWriter consumableNotifyingPartitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
+	private void testAddOnReleasedPartition(ResultPartitionType partitionType) throws Exception {
+		TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
+		BufferWritingResultPartition bufferWritingResultPartition = createResultPartition(partitionType);
+		ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
 			Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
-			new ResultPartitionWriter[] {partition},
-			taskActions,
-			jobId,
+			new ResultPartitionWriter[]{bufferWritingResultPartition},
+			new NoOpTaskActions(),
+			new JobID(),
 			notifier)[0];
 		try {
-			partition.release();
-			// partition.add() silently drops the bufferConsumer but recycles it
-			consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
-			assertTrue(partition.isReleased());
+			partitionWriter.release(null);
+			// partitionWriter.emitRecord() should silently drop the given record
+			partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
 		} finally {
-			if (!bufferConsumer.isRecycled()) {
-				bufferConsumer.close();
-				Assert.fail("bufferConsumer not recycled");
-			}
+			assertEquals(1, bufferWritingResultPartition.numBuffersOut.getCount());
+			assertEquals(bufferSize, bufferWritingResultPartition.numBytesOut.getCount());
+			// the buffer should be recycled for the result partition has already been released
+			assertEquals(0, bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
 			// should not have notified either
-			verify(notifier, never()).notifyPartitionConsumable(eq(jobId), eq(partition.getPartitionId()), eq(taskActions));
+			notifier.check(null, null, null, 0);
 		}
 	}
 
@@ -289,32 +290,31 @@ public class ResultPartitionTest {
 	}
 
 	/**
-	 * Tests {@link ResultPartition#addBufferConsumer(BufferConsumer, int)} on a working partition.
+	 * Tests {@link ResultPartition#emitRecord} on a working partition.
 	 *
 	 * @param partitionType the result partition type to set up
 	 */
 	private void testAddOnPartition(final ResultPartitionType partitionType) throws Exception {
-		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+		TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
 		JobID jobId = new JobID();
 		TaskActions taskActions = new NoOpTaskActions();
-		ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
-			partitionType,
+		BufferWritingResultPartition bufferWritingResultPartition = createResultPartition(partitionType);
+		ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
+			Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
+			new ResultPartitionWriter[]{bufferWritingResultPartition},
 			taskActions,
 			jobId,
-			notifier);
-		BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
+			notifier)[0];
 		try {
-			// partition.add() adds the bufferConsumer without recycling it (if not spilling)
-			consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
-			assertFalse("bufferConsumer should not be recycled (still in the queue)", bufferConsumer.isRecycled());
+			// partitionWriter.emitRecord() will allocate a new buffer and copies the record to it
+			partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
 		} finally {
-			if (!bufferConsumer.isRecycled()) {
-				bufferConsumer.close();
-			}
+			assertEquals(1, bufferWritingResultPartition.numBuffersOut.getCount());
+			assertEquals(bufferSize, bufferWritingResultPartition.numBytesOut.getCount());
+			assertEquals(1, bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
 			// should have been notified for pipelined partitions
 			if (partitionType.isPipelined()) {
-				verify(notifier, times(1))
-					.notifyPartitionConsumable(eq(jobId), eq(consumableNotifyingPartitionWriter.getPartitionId()), eq(taskActions));
+				notifier.check(jobId, partitionWriter.getPartitionId(), taskActions, 1);
 			}
 		}
 	}
@@ -332,15 +332,14 @@ public class ResultPartitionTest {
 	private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception {
 		final int numAllBuffers = 10;
 		final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
-			.setNumNetworkBuffers(numAllBuffers).build();
+			.setNumNetworkBuffers(numAllBuffers).setBufferSize(bufferSize).build();
 		final ResultPartition resultPartition = createPartition(network, resultPartitionType, 1);
 		try {
 			resultPartition.setup();
 
 			// take all buffers (more than the minimum required)
 			for (int i = 0; i < numAllBuffers; ++i) {
-				BufferBuilder bufferBuilder = resultPartition.getBufferPool().requestBufferBuilderBlocking();
-				resultPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
+				resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
 			}
 			resultPartition.finish();
 
@@ -366,10 +365,12 @@ public class ResultPartitionTest {
 	 * Tests {@link ResultPartition#getAvailableFuture()}.
 	 */
 	@Test
-	public void testIsAvailableOrNot() throws IOException, InterruptedException {
+	public void testIsAvailableOrNot() throws IOException {
 		final int numAllBuffers = 10;
+		final int bufferSize = 1024;
 		final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
-				.setNumNetworkBuffers(numAllBuffers).build();
+				.setNumNetworkBuffers(numAllBuffers)
+				.setBufferSize(bufferSize).build();
 		final ResultPartition resultPartition = createPartition(network, ResultPartitionType.PIPELINED, 1);
 
 		try {
@@ -379,8 +380,8 @@ public class ResultPartitionTest {
 
 			assertTrue(resultPartition.getAvailableFuture().isDone());
 
-			resultPartition.getBufferBuilder(0);
-			resultPartition.getBufferBuilder(0);
+			resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+			resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
 			assertFalse(resultPartition.getAvailableFuture().isDone());
 		} finally {
 			resultPartition.release();
@@ -434,17 +435,25 @@ public class ResultPartitionTest {
 			ResultPartitionType partitionType,
 			TaskActions taskActions,
 			JobID jobId,
-			ResultPartitionConsumableNotifier notifier) {
-		ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
-			createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
+			ResultPartitionConsumableNotifier notifier) throws IOException {
+		ResultPartition partition = createResultPartition(partitionType);
 		return ConsumableNotifyingResultPartitionWriterDecorator.decorate(
 			Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
-			new ResultPartitionWriter[] {partition},
+			new ResultPartitionWriter[]{partition},
 			taskActions,
 			jobId,
 			notifier)[0];
 	}
 
+	private BufferWritingResultPartition createResultPartition(ResultPartitionType partitionType) throws IOException {
+		NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
+			.setNumNetworkBuffers(10)
+			.setBufferSize(bufferSize).build();
+		ResultPartition resultPartition = createPartition(network, fileChannelManager, partitionType, 1);
+		resultPartition.setup();
+		return (BufferWritingResultPartition) resultPartition;
+	}
+
 	@Test
 	public void testInitializeEmptyState() throws Exception {
 		final int totalBuffers = 2;
@@ -558,6 +567,50 @@ public class ResultPartitionTest {
 		}
 	}
 
+	@Test
+	public void testIdleTime() throws IOException, InterruptedException {
+		// setup
+		int bufferSize = 1024;
+		NetworkBufferPool globalPool = new NetworkBufferPool(10, bufferSize);
+		BufferPool localPool = globalPool.createBufferPool(1, 1, null, 1, Integer.MAX_VALUE);
+		BufferWritingResultPartition resultPartition = (BufferWritingResultPartition) new ResultPartitionBuilder()
+			.setBufferPoolFactory(p -> localPool)
+			.build();
+		resultPartition.setup();
+
+		resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+		ResultSubpartitionView readView = resultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
+		Buffer buffer = readView.getNextBuffer().buffer();
+		assertNotNull(buffer);
+
+		// idle time is zero when there is buffer available.
+		assertEquals(0, resultPartition.getIdleTimeMsPerSecond().getCount());
+
+		CountDownLatch syncLock = new CountDownLatch(1);
+		final Thread requestThread = new Thread(() -> {
+			try {
+				// notify that the request thread start to run.
+				syncLock.countDown();
+				// wait for buffer.
+				resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+			} catch (Exception e) {
+			}
+		});
+		requestThread.start();
+
+		// wait until request thread start to run.
+		syncLock.await();
+
+		Thread.sleep(100);
+
+		//recycle the buffer
+		buffer.recycleBuffer();
+		requestThread.join();
+
+		Assert.assertThat(resultPartition.getIdleTimeMsPerSecond().getCount(), Matchers.greaterThan(0L));
+		assertNotNull(readView.getNextBuffer().buffer());
+	}
+
 	/**
 	 * The {@link ChannelStateReader} instance for restoring the specific number of states.
 	 */
@@ -631,4 +684,33 @@ public class ResultPartitionTest {
 		public void close() {
 		}
 	}
+
+	private static class TestResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+		private JobID jobID;
+		private ResultPartitionID partitionID;
+		private TaskActions taskActions;
+		private int numNotification;
+
+		@Override
+		public void notifyPartitionConsumable(JobID jobID, ResultPartitionID partitionID, TaskActions taskActions) {
+			++numNotification;
+			this.jobID = jobID;
+			this.partitionID = partitionID;
+			this.taskActions = taskActions;
+		}
+
+		private void check(JobID jobID, ResultPartitionID partitionID, TaskActions taskActions, int numNotification) {
+			assertEquals(jobID, this.jobID);
+			assertEquals(partitionID, this.partitionID);
+			assertEquals(taskActions, this.taskActions);
+			assertEquals(numNotification, this.numNotification);
+		}
+
+		private void reset() {
+			jobID = null;
+			partitionID = null;
+			taskActions = null;
+			numNotification = 0;
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index 4a5f445..a4985cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -32,6 +32,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Optional;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
@@ -49,7 +50,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 
 	private MutableObjectIterator<T> inputIterator;
 
-	private RecordSerializer<T> serializer;
+	private DataOutputSerializer serializer;
 
 	private final T reuse;
 
@@ -68,7 +69,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 
 	private IteratorWrappingTestSingleInputGate<T> wrapIterator(MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
 		inputIterator = iterator;
-		serializer = new SpanningRecordSerializer<T>();
+		serializer = new DataOutputSerializer(128);
 
 		// The input iterator can produce an infinite stream. That's why we have to serialize each
 		// record on demand and cannot do it upfront.
@@ -79,10 +80,10 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 			@Override
 			public Optional<BufferAndAvailability> getBufferAvailability() throws IOException {
 				if (hasData) {
-					serializer.serializeRecord(reuse);
+					ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, reuse);
 					BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
 					BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
-					serializer.copyToBufferBuilder(bufferBuilder);
+					bufferBuilder.appendAndCommit(serializedRecord);
 
 					hasData = inputIterator.next(reuse) != null;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 6e65ad74..90cbc6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -28,13 +28,12 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
@@ -60,7 +59,6 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -135,8 +133,8 @@ public class LocalInputChannelTest {
 				.setNumberOfSubpartitions(parallelism)
 				.setNumTargetKeyGroups(parallelism)
 				.setResultPartitionManager(partitionManager)
-				.setBufferPoolFactory(p ->
-					networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize))
+				.setBufferPoolFactory(p -> networkBuffers.createBufferPool(
+					producerBufferPoolSize, producerBufferPoolSize, null, parallelism, Integer.MAX_VALUE))
 				.build();
 
 			// Create a buffer pool for this partition
@@ -144,11 +142,11 @@ public class LocalInputChannelTest {
 
 			// Create the producer
 			partitionProducers[i] = new TestPartitionProducer(
-				partition,
+				(BufferWritingResultPartition) partition,
 				false,
 				new TestPartitionProducerBufferSource(
 					parallelism,
-					partition.getBufferPool(),
+					TestBufferFactory.BUFFER_SIZE,
 					numberOfBuffersPerChannel)
 			);
 		}
@@ -519,16 +517,16 @@ public class LocalInputChannelTest {
 	 */
 	private static class TestPartitionProducerBufferSource implements TestProducerSource {
 
-		private final BufferProvider bufferProvider;
+		private final int bufferSize;
 
 		private final List<Byte> channelIndexes;
 
 		public TestPartitionProducerBufferSource(
 				int parallelism,
-				BufferProvider bufferProvider,
+				int bufferSize,
 				int numberOfBuffersToProduce) {
 
-			this.bufferProvider = bufferProvider;
+			this.bufferSize = bufferSize;
 			this.channelIndexes = Lists.newArrayListWithCapacity(
 					parallelism * numberOfBuffersToProduce);
 
@@ -544,14 +542,10 @@ public class LocalInputChannelTest {
 		}
 
 		@Override
-		public BufferConsumerAndChannel getNextBufferConsumer() throws Exception {
+		public BufferAndChannel getNextBuffer() throws Exception {
 			if (channelIndexes.size() > 0) {
 				final int channelIndex = channelIndexes.remove(0);
-				BufferBuilder bufferBuilder = bufferProvider.requestBufferBuilderBlocking();
-				BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
-				bufferBuilder.appendAndCommit(ByteBuffer.wrap(new byte[4]));
-				bufferBuilder.finish();
-				return new BufferConsumerAndChannel(bufferConsumer, channelIndex);
+				return new BufferAndChannel(new byte[bufferSize], channelIndex);
 			}
 
 			return null;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 3787627..d16ee7f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.io.network.TaskEventPublisher;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -42,6 +41,7 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
 import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
@@ -753,7 +753,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 	public void testQueuedBuffers() throws Exception {
 		final NettyShuffleEnvironment network = createNettyShuffleEnvironment();
 
-		final ResultPartition resultPartition = new ResultPartitionBuilder()
+		final BufferWritingResultPartition resultPartition = (BufferWritingResultPartition) new ResultPartitionBuilder()
 			.setResultPartitionManager(network.getResultPartitionManager())
 			.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
 			.build();
@@ -784,7 +784,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 			remoteInputChannel.onBuffer(createBuffer(1), 0, 0);
 			assertEquals(1, inputGate.getNumberOfQueuedBuffers());
 
-			resultPartition.addBufferConsumer(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1), 0);
+			resultPartition.emitRecord(ByteBuffer.allocate(1), 0);
 			assertEquals(2, inputGate.getNumberOfQueuedBuffers());
 		} finally {
 			resultPartition.release();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
index 2dfb4c9..9d94402 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.io.network.util;
 
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferConsumerAndChannel;
+import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
+import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferAndChannel;
 
+import java.nio.ByteBuffer;
 import java.util.Random;
 import java.util.concurrent.Callable;
 
@@ -38,7 +39,7 @@ public class TestPartitionProducer implements Callable<Boolean> {
 	public static final int MAX_SLEEP_TIME_MS = 20;
 
 	/** The partition to add data to. */
-	private final ResultPartition partition;
+	private final BufferWritingResultPartition partition;
 
 	/**
 	 * Flag indicating whether the consumer is slow. If true, the consumer will sleep a random
@@ -53,7 +54,7 @@ public class TestPartitionProducer implements Callable<Boolean> {
 	private final Random random;
 
 	public TestPartitionProducer(
-			ResultPartition partition,
+			BufferWritingResultPartition partition,
 			boolean isSlowProducer,
 			TestProducerSource source) {
 
@@ -69,10 +70,11 @@ public class TestPartitionProducer implements Callable<Boolean> {
 		boolean success = false;
 
 		try {
-			BufferConsumerAndChannel consumerAndChannel;
+			BufferAndChannel bufferAndChannel;
 
-			while ((consumerAndChannel = source.getNextBufferConsumer()) != null) {
-				partition.addBufferConsumer(consumerAndChannel.getBufferConsumer(), consumerAndChannel.getTargetChannel());
+			while ((bufferAndChannel = source.getNextBuffer()) != null) {
+				ByteBuffer record = ByteBuffer.wrap(bufferAndChannel.getBuffer());
+				partition.emitRecord(record, bufferAndChannel.getTargetChannel());
 
 				// Check for interrupted flag after adding data to prevent resource leaks
 				if (Thread.interrupted()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
index f5d97f5..99006e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
@@ -29,19 +29,19 @@ public interface TestProducerSource {
 	 *
 	 * <p> The channel index specifies the subpartition add the data to.
 	 */
-	BufferConsumerAndChannel getNextBufferConsumer() throws Exception;
+	BufferAndChannel getNextBuffer() throws Exception;
 
-	class BufferConsumerAndChannel {
-		private final BufferConsumer bufferConsumer;
+	class BufferAndChannel {
+		private final byte[] buffer;
 		private final int targetChannel;
 
-		public BufferConsumerAndChannel(BufferConsumer bufferConsumer, int targetChannel) {
-			this.bufferConsumer = checkNotNull(bufferConsumer);
+		public BufferAndChannel(byte[] buffer, int targetChannel) {
+			this.buffer = checkNotNull(buffer);
 			this.targetChannel = targetChannel;
 		}
 
-		public BufferConsumer getBufferConsumer() {
-			return bufferConsumer;
+		public byte[] getBuffer() {
+			return buffer;
 		}
 
 		public int getTargetChannel() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
index 60eee34..a14cdba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
@@ -18,8 +18,12 @@
 
 package org.apache.flink.runtime.io.network.util;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
-import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferConsumerAndChannel;
+import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferAndChannel;
 
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -69,10 +73,11 @@ public class TestSubpartitionProducer implements Callable<Boolean> {
 		boolean success = false;
 
 		try {
-			BufferConsumerAndChannel consumerAndChannel;
+			BufferAndChannel bufferAndChannel;
 
-			while ((consumerAndChannel = source.getNextBufferConsumer()) != null) {
-				subpartition.add(consumerAndChannel.getBufferConsumer());
+			while ((bufferAndChannel = source.getNextBuffer()) != null) {
+				MemorySegment segment = MemorySegmentFactory.wrap(bufferAndChannel.getBuffer());
+				subpartition.add(new BufferConsumer(segment, MemorySegment::free, Buffer.DataType.DATA_BUFFER));
 
 				// Check for interrupted flag after adding data to prevent resource leaks
 				if (Thread.interrupted()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 2865204..a28a0b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -202,7 +202,7 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 	public void addOutput(final List<Record> outputList) {
 		try {
-			outputs.add(new RecordCollectingResultPartitionWriter(outputList, new TestPooledBufferProvider(Integer.MAX_VALUE)));
+			outputs.add(new RecordCollectingResultPartitionWriter(outputList));
 		}
 		catch (Throwable t) {
 			t.printStackTrace();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 082ddc5..a297460 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -21,11 +21,11 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.B
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 
+import java.nio.ByteBuffer;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -81,7 +82,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 
 		for (int i = 0; i < numInputChannels; i++) {
 			final int channelIndex = i;
-			final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
+			final DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
 			final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
 				new SerializationDelegate<>(new StreamElementSerializer<T>(serializer));
 
@@ -107,10 +108,10 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 					Object inputElement = input.getStreamRecord();
 
 					delegate.setInstance(inputElement);
-					recordSerializer.serializeRecord(delegate);
+					ByteBuffer serializedRecord = RecordWriter.serializeRecord(dataOutputSerializer, delegate);
 					BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
 					BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
-					recordSerializer.copyToBufferBuilder(bufferBuilder);
+					bufferBuilder.appendAndCommit(serializedRecord);
 					bufferBuilder.finish();
 
 					// Call getCurrentBuffer to ensure size is set
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index 4c68ec3..48f833d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
@@ -27,9 +28,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -53,6 +53,7 @@ import org.junit.After;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -218,14 +219,15 @@ public class StreamTaskNetworkInputTest {
 	}
 
 	private void serializeRecord(long value, BufferBuilder bufferBuilder) throws IOException {
-		RecordSerializer<SerializationDelegate<StreamElement>> serializer = new SpanningRecordSerializer<>();
+		DataOutputSerializer serializer = new DataOutputSerializer(128);
 		SerializationDelegate<StreamElement> serializationDelegate =
 			new SerializationDelegate<>(
 				new StreamElementSerializer<>(LongSerializer.INSTANCE));
 		serializationDelegate.setInstance(new StreamRecord<>(value));
-		serializer.serializeRecord(serializationDelegate);
+		ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, serializationDelegate);
+		bufferBuilder.appendAndCommit(serializedRecord);
 
-		assertFalse(serializer.copyToBufferBuilder(bufferBuilder).isFullBuffer());
+		assertFalse(bufferBuilder.isFull());
 	}
 
 	private static void assertHasNextElement(StreamTaskNetworkInput input, DataOutput output) throws Exception {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index bf2b79c..447b9a0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
@@ -204,7 +203,6 @@ public class StreamMockEnvironment implements Environment {
 		try {
 			outputs.add(new RecordOrEventCollectingResultPartitionWriter<T>(
 				outputList,
-				new TestPooledBufferProvider(Integer.MAX_VALUE),
 				serializer));
 		}
 		catch (Throwable t) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index 00ef9a1..87142d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -252,11 +251,10 @@ public class SubtaskCheckpointCoordinatorTest {
 			.setEnvironment(mockEnvironment)
 			.build();
 
-		TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(1, 4096);
 		ArrayList<Object> recordOrEvents = new ArrayList<>();
 		StreamElementSerializer<String> stringStreamElementSerializer = new StreamElementSerializer<>(StringSerializer.INSTANCE);
 		ResultPartitionWriter resultPartitionWriter = new RecordOrEventCollectingResultPartitionWriter<>(
-			recordOrEvents, bufferProvider, stringStreamElementSerializer);
+			recordOrEvents, stringStreamElementSerializer);
 		mockEnvironment.addOutputs(Collections.singletonList(resultPartitionWriter));
 
 		OneInputStreamTask<String, String> task = testHarness.getTask();