You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2020/09/24 16:05:15 UTC

[flink] 04/07: [FLINK-19297][network] Make ResultPartitionWriter record-oriented

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 900a896922403f4c1538d4b785d17b7cf72abcd7
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Wed Aug 26 14:22:25 2020 +0800

    [FLINK-19297][network] Make ResultPartitionWriter record-oriented
    
    Currently, the ResultPartitionWriter is buffer-oriented, that is, RecordWriter can only add buffers of different channels to ResultPartitionWriter and the buffer boundary serves as a nature boundary of data belonging to different channels. However, this abstraction is not flexible enough to handle new implementations like sort-based partitioning where records are appended a joint structure shared by all channels and sorting is used to cluster data belonging to different channels. This [...]
---
 .../api/serialization/RecordSerializer.java        |  94 ------
 .../serialization/SpanningRecordSerializer.java    | 114 --------
 .../network/api/writer/BroadcastRecordWriter.java  | 129 +-------
 .../api/writer/ChannelSelectorRecordWriter.java    |  82 +-----
 .../io/network/api/writer/RecordWriter.java        | 171 +++--------
 .../network/api/writer/ResultPartitionWriter.java  |  55 ++--
 .../partition/BufferWritingResultPartition.java    | 179 ++++++++++--
 .../io/network/partition/ResultPartition.java      |  21 +-
 .../operators/shipping/OutputCollector.java        |   3 -
 ...bleNotifyingResultPartitionWriterDecorator.java |  64 ++--
 .../SpanningRecordSerializationTest.java           |  65 +++--
 .../SpanningRecordSerializerTest.java              | 196 -------------
 .../AbstractCollectingResultPartitionWriter.java   |  70 ++---
 .../api/writer/BroadcastRecordWriterTest.java      |  77 +++--
 .../RecordCollectingResultPartitionWriter.java     |   4 +-
 ...cordOrEventCollectingResultPartitionWriter.java |  35 +--
 .../api/writer/RecordWriterDelegateTest.java       |  53 ++--
 .../io/network/api/writer/RecordWriterTest.java    | 325 +++++----------------
 .../network/netty/PartitionRequestQueueTest.java   |  21 +-
 .../partition/MockResultPartitionWriter.java       |  34 ++-
 .../PartialConsumePipelinedResultTest.java         |   7 +-
 .../io/network/partition/PartitionTestUtils.java   |  12 -
 .../partition/PipelinedSubpartitionTest.java       |  26 +-
 .../io/network/partition/ResultPartitionTest.java  | 252 ++++++++++------
 .../IteratorWrappingTestSingleInputGate.java       |  13 +-
 .../partition/consumer/LocalInputChannelTest.java  |  26 +-
 .../partition/consumer/SingleInputGateTest.java    |   6 +-
 .../io/network/util/TestPartitionProducer.java     |  16 +-
 .../io/network/util/TestProducerSource.java        |  14 +-
 .../io/network/util/TestSubpartitionProducer.java  |  13 +-
 .../operators/testutils/MockEnvironment.java       |   2 +-
 .../consumer/StreamTestSingleInputGate.java        |  11 +-
 .../runtime/io/StreamTaskNetworkInputTest.java     |  12 +-
 .../runtime/tasks/StreamMockEnvironment.java       |   2 -
 .../tasks/SubtaskCheckpointCoordinatorTest.java    |   4 +-
 35 files changed, 785 insertions(+), 1423 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
deleted file mode 100644
index 6f73917..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.serialization;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-
-import java.io.IOException;
-
-/**
- * Interface for turning records into sequences of memory segments.
- */
-public interface RecordSerializer<T extends IOReadableWritable> {
-
-	/**
-	 * Status of the serialization result.
-	 */
-	enum SerializationResult {
-		PARTIAL_RECORD_MEMORY_SEGMENT_FULL(false, true),
-		FULL_RECORD_MEMORY_SEGMENT_FULL(true, true),
-		FULL_RECORD(true, false);
-
-		private final boolean isFullRecord;
-
-		private final boolean isFullBuffer;
-
-		SerializationResult(boolean isFullRecord, boolean isFullBuffer) {
-			this.isFullRecord = isFullRecord;
-			this.isFullBuffer = isFullBuffer;
-		}
-
-		/**
-		 * Whether the full record was serialized and completely written to
-		 * a target buffer.
-		 *
-		 * @return <tt>true</tt> if the complete record was written
-		 */
-		public boolean isFullRecord() {
-			return this.isFullRecord;
-		}
-
-		/**
-		 * Whether the target buffer is full after the serialization process.
-		 *
-		 * @return <tt>true</tt> if the target buffer is full
-		 */
-		public boolean isFullBuffer() {
-			return this.isFullBuffer;
-		}
-	}
-
-	/**
-	 * Starts serializing the given record to an intermediate data buffer.
-	 *
-	 * @param record the record to serialize
-	 */
-	void serializeRecord(T record) throws IOException;
-
-	/**
-	 * Copies the intermediate data serialization buffer to the given target buffer.
-	 *
-	 * @param bufferBuilder the new target buffer to use
-	 * @return how much information was written to the target buffer and
-	 *         whether this buffer is full
-	 */
-	SerializationResult copyToBufferBuilder(BufferBuilder bufferBuilder);
-
-	/**
-	 * Supports copying an intermediate data serialization buffer to multiple target buffers
-	 * by resetting its initial position before each copying.
-	 */
-	void reset();
-
-	/**
-	 * @return <tt>true</tt> if has some serialized data pending copying to the result {@link BufferBuilder}.
-	 */
-	boolean hasSerializedData();
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
deleted file mode 100644
index 1bb9ec8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.serialization;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataOutputSerializer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-/**
- * Record serializer which serializes the complete record to an intermediate
- * data serialization buffer and copies this buffer to target buffers
- * one-by-one using {@link #copyToBufferBuilder(BufferBuilder)}.
- *
- * @param <T> The type of the records that are serialized.
- */
-public class SpanningRecordSerializer<T extends IOReadableWritable> implements RecordSerializer<T> {
-
-	/** Flag to enable/disable checks, if buffer not set/full or pending serialization. */
-	private static final boolean CHECKED = false;
-
-	/** Intermediate data serialization. */
-	private final DataOutputSerializer serializationBuffer;
-
-	/** Intermediate buffer for data serialization (wrapped from {@link #serializationBuffer}). */
-	private ByteBuffer dataBuffer;
-
-	public SpanningRecordSerializer() {
-		serializationBuffer = new DataOutputSerializer(128);
-
-		// ensure initial state with hasRemaining false (for correct continueWritingWithNextBufferBuilder logic)
-		dataBuffer = serializationBuffer.wrapAsByteBuffer();
-	}
-
-	/**
-	 * Serializes the complete record to an intermediate data serialization buffer.
-	 *
-	 * @param record the record to serialize
-	 */
-	@Override
-	public void serializeRecord(T record) throws IOException {
-		if (CHECKED) {
-			if (dataBuffer.hasRemaining()) {
-				throw new IllegalStateException("Pending serialization of previous record.");
-			}
-		}
-
-		serializationBuffer.clear();
-		// the initial capacity of the serialization buffer should be no less than 4
-		serializationBuffer.skipBytesToWrite(4);
-
-		// write data and length
-		record.write(serializationBuffer);
-
-		int len = serializationBuffer.length() - 4;
-		serializationBuffer.setPosition(0);
-		serializationBuffer.writeInt(len);
-		serializationBuffer.skipBytesToWrite(len);
-
-		dataBuffer = serializationBuffer.wrapAsByteBuffer();
-	}
-
-	/**
-	 * Copies an intermediate data serialization buffer into the target BufferBuilder.
-	 *
-	 * @param targetBuffer the target BufferBuilder to copy to
-	 * @return how much information was written to the target buffer and
-	 *         whether this buffer is full
-	 */
-	@Override
-	public SerializationResult copyToBufferBuilder(BufferBuilder targetBuffer) {
-		targetBuffer.append(dataBuffer);
-		targetBuffer.commit();
-
-		return getSerializationResult(targetBuffer);
-	}
-
-	private SerializationResult getSerializationResult(BufferBuilder targetBuffer) {
-		if (dataBuffer.hasRemaining()) {
-			return SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL;
-		}
-		return !targetBuffer.isFull()
-			? SerializationResult.FULL_RECORD
-			: SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL;
-	}
-
-	@Override
-	public void reset() {
-		dataBuffer.position(0);
-	}
-
-	@Override
-	public boolean hasSerializedData() {
-		return dataBuffer.hasRemaining();
-	}
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
index b834738..ec800dc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
@@ -18,40 +18,20 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-
-import javax.annotation.Nullable;
 
 import java.io.IOException;
 
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * A special record-oriented runtime result writer only for broadcast mode.
  *
- * <p>The BroadcastRecordWriter extends the {@link RecordWriter} and maintain a single {@link BufferBuilder}
- * for all the channels. Then the serialization results need be copied only once to this buffer which would be
- * shared for all the channels in a more efficient way.
+ * <p>The BroadcastRecordWriter extends the {@link RecordWriter} and emits records to all channels for
+ * regular {@link #emit(IOReadableWritable)}.
  *
  * @param <T> the type of the record that can be emitted with this record writer
  */
 public final class BroadcastRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
 
-	/** The current buffer builder shared for all the channels. */
-	@Nullable
-	private BufferBuilder bufferBuilder;
-
-	/**
-	 * The flag for judging whether {@link #requestNewBufferBuilder(int)} and {@link #flushTargetPartition(int)}
-	 * is triggered by {@link #randomEmit(IOReadableWritable)} or not.
-	 */
-	private boolean randomTriggered;
-
-	private BufferConsumer randomTriggeredConsumer;
-
 	BroadcastRecordWriter(
 			ResultPartitionWriter writer,
 			long timeout,
@@ -60,113 +40,18 @@ public final class BroadcastRecordWriter<T extends IOReadableWritable> extends R
 	}
 
 	@Override
-	public void emit(T record) throws IOException, InterruptedException {
+	public void emit(T record) throws IOException {
 		broadcastEmit(record);
 	}
 
 	@Override
-	public void randomEmit(T record) throws IOException, InterruptedException {
-		randomEmit(record, rng.nextInt(numberOfChannels));
-	}
-
-	/**
-	 * For non-broadcast emit, we try to finish the current {@link BufferBuilder} first, and then request
-	 * a new {@link BufferBuilder} for the random channel. If this new {@link BufferBuilder} is not full,
-	 * it can be shared for all the other channels via initializing readable position in created
-	 * {@link BufferConsumer}.
-	 */
-	@VisibleForTesting
-	void randomEmit(T record, int targetChannelIndex) throws IOException, InterruptedException {
-		tryFinishCurrentBufferBuilder(targetChannelIndex);
-
-		randomTriggered = true;
-		emit(record, targetChannelIndex);
-		randomTriggered = false;
-
-		if (bufferBuilder != null) {
-			for (int index = 0; index < numberOfChannels; index++) {
-				if (index != targetChannelIndex) {
-					addBufferConsumer(randomTriggeredConsumer.copyWithReaderPosition(bufferBuilder.getCommittedBytes()), index);
-				}
-			}
-		}
-	}
+	public void broadcastEmit(T record) throws IOException {
+		checkErroneous();
 
-	@Override
-	public void broadcastEmit(T record) throws IOException, InterruptedException {
-		// We could actually select any target channel here because all the channels
-		// are sharing the same BufferBuilder in broadcast mode.
-		emit(record, 0);
-	}
+		targetPartition.broadcastRecord(serializeRecord(serializer, record));
 
-	/**
-	 * The flush could be triggered by {@link #randomEmit(IOReadableWritable)}, {@link #emit(IOReadableWritable)}
-	 * or {@link #broadcastEmit(IOReadableWritable)}. Only random emit should flush a single target channel,
-	 * otherwise we should flush all the channels.
-	 */
-	@Override
-	public void flushTargetPartition(int targetChannel) {
-		if (randomTriggered) {
-			super.flushTargetPartition(targetChannel);
-		} else {
+		if (flushAlways) {
 			flushAll();
 		}
 	}
-
-	@Override
-	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		return bufferBuilder != null ? bufferBuilder : requestNewBufferBuilder(targetChannel);
-	}
-
-	/**
-	 * The request could be from broadcast or non-broadcast modes like {@link #randomEmit(IOReadableWritable)}.
-	 *
-	 * <p>For non-broadcast, the created {@link BufferConsumer} is only for the target channel.
-	 *
-	 * <p>For broadcast, all the channels share the same requested {@link BufferBuilder} and the created
-	 * {@link BufferConsumer} is copied for every channel.
-	 */
-	@Override
-	public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		checkState(bufferBuilder == null || bufferBuilder.isFinished());
-
-		BufferBuilder builder = super.requestNewBufferBuilder(targetChannel);
-		if (randomTriggered) {
-			addBufferConsumer(randomTriggeredConsumer = builder.createBufferConsumer(), targetChannel);
-		} else {
-			try (BufferConsumer bufferConsumer = builder.createBufferConsumer()) {
-				for (int channel = 0; channel < numberOfChannels; channel++) {
-					addBufferConsumer(bufferConsumer.copy(), channel);
-				}
-			}
-		}
-
-		bufferBuilder = builder;
-		return builder;
-	}
-
-	@Override
-	public void tryFinishCurrentBufferBuilder(int targetChannel) {
-		if (bufferBuilder == null) {
-			return;
-		}
-
-		BufferBuilder builder = bufferBuilder;
-		bufferBuilder = null;
-
-		finishBufferBuilder(builder);
-	}
-
-	@Override
-	public void emptyCurrentBufferBuilder(int targetChannel) {
-		bufferBuilder = null;
-	}
-
-	@Override
-	public void closeBufferBuilders() {
-		if (bufferBuilder != null) {
-			bufferBuilder.finish();
-			bufferBuilder = null;
-		}
-	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
index 5e32056..b94207e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ChannelSelectorRecordWriter.java
@@ -19,19 +19,17 @@
 package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * A regular record-oriented runtime result writer.
  *
- * <p>The ChannelSelectorRecordWriter extends the {@link RecordWriter} and maintains an array of
- * {@link BufferBuilder}s for all the channels. The {@link #emit(IOReadableWritable)}
- * operation is based on {@link ChannelSelector} to select the target channel.
+ * <p>The ChannelSelectorRecordWriter extends the {@link RecordWriter} and emits records to the channel
+ * selected by the {@link ChannelSelector} for regular {@link #emit(IOReadableWritable)}.
  *
  * @param <T> the type of the record that can be emitted with this record writer
  */
@@ -39,9 +37,6 @@ public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> ext
 
 	private final ChannelSelector<T> channelSelector;
 
-	/** Every subpartition maintains a separate buffer builder which might be null. */
-	private final BufferBuilder[] bufferBuilders;
-
 	ChannelSelectorRecordWriter(
 			ResultPartitionWriter writer,
 			ChannelSelector<T> channelSelector,
@@ -51,77 +46,28 @@ public final class ChannelSelectorRecordWriter<T extends IOReadableWritable> ext
 
 		this.channelSelector = checkNotNull(channelSelector);
 		this.channelSelector.setup(numberOfChannels);
-
-		this.bufferBuilders = new BufferBuilder[numberOfChannels];
 	}
 
 	@Override
-	public void emit(T record) throws IOException, InterruptedException {
+	public void emit(T record) throws IOException {
 		emit(record, channelSelector.selectChannel(record));
 	}
 
 	@Override
-	public void randomEmit(T record) throws IOException, InterruptedException {
-		emit(record, rng.nextInt(numberOfChannels));
-	}
-
-	/**
-	 * The record is serialized into intermediate serialization buffer which is then copied
-	 * into the target buffer for every channel.
-	 */
-	@Override
-	public void broadcastEmit(T record) throws IOException, InterruptedException {
+	public void broadcastEmit(T record) throws IOException {
 		checkErroneous();
 
-		serializer.serializeRecord(record);
-
-		for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) {
-			copyFromSerializerToTargetChannel(targetChannel);
-		}
-	}
-
-	@Override
-	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		if (bufferBuilders[targetChannel] != null) {
-			return bufferBuilders[targetChannel];
-		} else {
-			return requestNewBufferBuilder(targetChannel);
-		}
-	}
-
-	@Override
-	public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		checkState(bufferBuilders[targetChannel] == null || bufferBuilders[targetChannel].isFinished());
-
-		BufferBuilder bufferBuilder = super.requestNewBufferBuilder(targetChannel);
-		addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
-		bufferBuilders[targetChannel] = bufferBuilder;
-		return bufferBuilder;
-	}
-
-	@Override
-	public void tryFinishCurrentBufferBuilder(int targetChannel) {
-		if (bufferBuilders[targetChannel] == null) {
-			return;
+		// Emitting to all channels in a for loop can be better than calling
+		// ResultPartitionWriter#broadcastRecord because the broadcastRecord
+		// method incurs extra overhead.
+		ByteBuffer serializedRecord = serializeRecord(serializer, record);
+		for (int channelIndex = 0; channelIndex < numberOfChannels; channelIndex++) {
+			serializedRecord.rewind();
+			emit(record, channelIndex);
 		}
-		BufferBuilder bufferBuilder = bufferBuilders[targetChannel];
-		bufferBuilders[targetChannel] = null;
 
-		finishBufferBuilder(bufferBuilder);
-	}
-
-	@Override
-	public void emptyCurrentBufferBuilder(int targetChannel) {
-		bufferBuilders[targetChannel] = null;
-	}
-
-	@Override
-	public void closeBufferBuilders() {
-		for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) {
-			if (bufferBuilders[targetChannel] != null) {
-				bufferBuilders[targetChannel].finish();
-				bufferBuilders[targetChannel] = null;
-			}
+		if (flushAlways) {
+			flushAll();
 		}
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 995e670..b58ef38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -20,17 +20,9 @@ package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Meter;
-import org.apache.flink.metrics.MeterView;
-import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.AvailabilityProvider;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.util.XORShiftRandom;
 
@@ -40,18 +32,17 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
 import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * An abstract record-oriented runtime result writer.
  *
  * <p>The RecordWriter wraps the runtime's {@link ResultPartitionWriter} and takes care of
- * serializing records into buffers.
+ * channel selection and serializing records into bytes.
  *
  * @param <T> the type of the record that can be emitted with this record writer
  */
@@ -63,21 +54,15 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 
 	private static final Logger LOG = LoggerFactory.getLogger(RecordWriter.class);
 
-	private final ResultPartitionWriter targetPartition;
+	protected final ResultPartitionWriter targetPartition;
 
 	protected final int numberOfChannels;
 
-	protected final RecordSerializer<T> serializer;
+	protected final DataOutputSerializer serializer;
 
 	protected final Random rng = new XORShiftRandom();
 
-	private Counter numBytesOut = new SimpleCounter();
-
-	private Counter numBuffersOut = new SimpleCounter();
-
-	protected Meter idleTimeMsPerSecond = new MeterView(new SimpleCounter());
-
-	private final boolean flushAlways;
+	protected final boolean flushAlways;
 
 	/** The thread that periodically flushes the output, to give an upper latency bound. */
 	@Nullable
@@ -93,7 +78,7 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 		this.targetPartition = writer;
 		this.numberOfChannels = writer.getNumberOfSubpartitions();
 
-		this.serializer = new SpanningRecordSerializer<T>();
+		this.serializer = new DataOutputSerializer(128);
 
 		checkArgument(timeout >= -1);
 		this.flushAlways = (timeout == 0);
@@ -109,89 +94,58 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 		}
 	}
 
-	protected void emit(T record, int targetChannel) throws IOException, InterruptedException {
+	protected void emit(T record, int targetSubpartition) throws IOException {
 		checkErroneous();
 
-		serializer.serializeRecord(record);
+		targetPartition.emitRecord(serializeRecord(serializer, record), targetSubpartition);
 
-		// Make sure we don't hold onto the large intermediate serialization buffer for too long
-		copyFromSerializerToTargetChannel(targetChannel);
+		if (flushAlways) {
+			targetPartition.flush(targetSubpartition);
+		}
 	}
 
-	/**
-	 * @param targetChannel
-	 * @return <tt>true</tt> if the intermediate serialization buffer should be pruned
-	 */
-	protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
-		// We should reset the initial position of the intermediate serialization buffer before
-		// copying, so the serialization results can be copied to multiple target buffers.
-		serializer.reset();
-
-		boolean pruneTriggered = false;
-		BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
-		SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
-		while (result.isFullBuffer()) {
-			finishBufferBuilder(bufferBuilder);
-
-			// If this was a full record, we are done. Not breaking out of the loop at this point
-			// will lead to another buffer request before breaking out (that would not be a
-			// problem per se, but it can lead to stalls in the pipeline).
-			if (result.isFullRecord()) {
-				pruneTriggered = true;
-				emptyCurrentBufferBuilder(targetChannel);
-				break;
-			}
+	public void broadcastEvent(AbstractEvent event) throws IOException {
+		broadcastEvent(event, false);
+	}
 
-			bufferBuilder = requestNewBufferBuilder(targetChannel);
-			result = serializer.copyToBufferBuilder(bufferBuilder);
-		}
-		checkState(!serializer.hasSerializedData(), "All data should be written at once");
+	public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
+		targetPartition.broadcastEvent(event, isPriorityEvent);
 
 		if (flushAlways) {
-			flushTargetPartition(targetChannel);
+			flushAll();
 		}
-		return pruneTriggered;
 	}
 
-	public void broadcastEvent(AbstractEvent event) throws IOException {
-		broadcastEvent(event, false);
-	}
+	@VisibleForTesting
+	public static ByteBuffer serializeRecord(
+			DataOutputSerializer serializer,
+			IOReadableWritable record) throws IOException {
+		serializer.clear();
 
-	public void broadcastEvent(AbstractEvent event, boolean hasPriority) throws IOException {
-		try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event, hasPriority)) {
-			for (int targetChannel = 0; targetChannel < numberOfChannels; targetChannel++) {
-				tryFinishCurrentBufferBuilder(targetChannel);
+		// the initial capacity should be no less than 4 bytes
+		serializer.skipBytesToWrite(4);
 
-				// Retain the buffer so that it can be recycled by each channel of targetPartition
-				targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
-			}
+		// write data
+		record.write(serializer);
 
-			if (flushAlways) {
-				flushAll();
-			}
-		}
+		// write length
+		int len = serializer.length() - 4;
+		serializer.setPosition(0);
+		serializer.writeInt(len);
+		serializer.skipBytesToWrite(len);
+
+		return serializer.wrapAsByteBuffer();
 	}
 
 	public void flushAll() {
 		targetPartition.flushAll();
 	}
 
-	protected void flushTargetPartition(int targetChannel) {
-		targetPartition.flush(targetChannel);
-	}
-
 	/**
 	 * Sets the metric group for this RecordWriter.
      */
 	public void setMetricGroup(TaskIOMetricGroup metrics) {
-		numBytesOut = metrics.getNumBytesOutCounter();
-		numBuffersOut = metrics.getNumBuffersOutCounter();
-		idleTimeMsPerSecond = metrics.getIdleTimeMsPerSecond();
-	}
-
-	protected void finishBufferBuilder(BufferBuilder bufferBuilder) {
-		numBytesOut.inc(bufferBuilder.finish());
-		numBuffersOut.inc();
+		targetPartition.setMetricGroup(metrics);
 	}
 
 	@Override
@@ -202,44 +156,27 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 	/**
 	 * This is used to send regular records.
 	 */
-	public abstract void emit(T record) throws IOException, InterruptedException;
+	public abstract void emit(T record) throws IOException;
 
 	/**
 	 * This is used to send LatencyMarks to a random target channel.
 	 */
-	public abstract void randomEmit(T record) throws IOException, InterruptedException;
-
-	/**
-	 * This is used to broadcast streaming Watermarks in-band with records.
-	 */
-	public abstract void broadcastEmit(T record) throws IOException, InterruptedException;
-
-	/**
-	 * The {@link BufferBuilder} may already exist if not filled up last time, otherwise we need
-	 * request a new one for this target channel.
-	 */
-	abstract BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException;
-
-	/**
-	 * Marks the current {@link BufferBuilder} as finished if present and clears the state for next one.
-	 */
-	abstract void tryFinishCurrentBufferBuilder(int targetChannel);
+	public void randomEmit(T record) throws IOException {
+		checkErroneous();
 
-	/**
-	 * Marks the current {@link BufferBuilder} as empty for the target channel.
-	 */
-	abstract void emptyCurrentBufferBuilder(int targetChannel);
+		int targetSubpartition = rng.nextInt(numberOfChannels);
+		emit(record, targetSubpartition);
+	}
 
 	/**
-	 * Marks the current {@link BufferBuilder}s as finished and releases the resources.
+	 * This is used to broadcast streaming Watermarks in-band with records.
 	 */
-	abstract void closeBufferBuilders();
+	public abstract void broadcastEmit(T record) throws IOException;
 
 	/**
 	 * Closes the writer. This stops the flushing thread (if there is one).
 	 */
 	public void close() {
-		closeBufferBuilders();
 		// make sure we terminate the thread in any case
 		if (outputFlusher != null) {
 			outputFlusher.terminate();
@@ -277,28 +214,6 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 		}
 	}
 
-	protected void addBufferConsumer(BufferConsumer consumer, int targetChannel) throws IOException {
-		targetPartition.addBufferConsumer(consumer, targetChannel);
-	}
-
-	/**
-	 * Requests a new {@link BufferBuilder} for the target channel and returns it.
-	 */
-	public BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		BufferBuilder builder = targetPartition.tryGetBufferBuilder(targetChannel);
-		if (builder == null) {
-			long start = System.currentTimeMillis();
-			builder = targetPartition.getBufferBuilder(targetChannel);
-			idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
-		}
-		return builder;
-	}
-
-	@VisibleForTesting
-	public Meter getIdleTimeMsPerSecond() {
-		return idleTimeMsPerSecond;
-	}
-
 	// ------------------------------------------------------------------------
 
 	/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 838850e..a90b0ac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,19 +18,20 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.AvailabilityProvider;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 /**
- * A buffer-oriented runtime result writer API for producing results.
+ * A record-oriented runtime result writer API for producing results.
  *
  * <p>If {@link ResultPartitionWriter#close()} is called before {@link ResultPartitionWriter#fail(Throwable)} or
  * {@link ResultPartitionWriter#finish()}, it abruptly triggers failure and cancellation of production.
@@ -51,30 +52,27 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
 	int getNumTargetKeyGroups();
 
 	/**
-	 * Requests a {@link BufferBuilder} from this partition for writing data.
+	 * Writes the given serialized record to the target subpartition.
 	 */
-	BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException;
+	void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException;
 
+	/**
+	 * Writes the given serialized record to all subpartitions. One can also achieve the same effect by emitting
+	 * the same record to all subpartitions one by one, however, this method can have better performance for the
+	 * underlying implementation can do some optimizations, for example coping the given serialized record only
+	 * once to a shared channel which can be consumed by all subpartitions.
+	 */
+	void broadcastRecord(ByteBuffer record) throws IOException;
 
 	/**
-	 * Try to request a {@link BufferBuilder} from this partition for writing data.
-	 *
-	 * <p>Returns <code>null</code> if no buffer is available or the buffer provider has been destroyed.
+	 * Writes the given {@link AbstractEvent} to all channels.
 	 */
-	BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException;
+	void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException;
 
 	/**
-	 * Adds the bufferConsumer to the subpartition with the given index.
-	 *
-	 * <p>This method takes the ownership of the passed {@code bufferConsumer} and thus is responsible for releasing
-	 * it's resources.
-	 *
-	 * <p>To avoid problems with data re-ordering, before adding new {@link BufferConsumer} the previously added one
-	 * the given {@code subpartitionIndex} must be marked as {@link BufferConsumer#isFinished()}.
-	 *
-	 * @return true if operation succeeded and bufferConsumer was enqueued for consumption.
+	 * Sets the metric group for the {@link ResultPartitionWriter}.
 	 */
-	boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException;
+	void setMetricGroup(TaskIOMetricGroup metrics);
 
 	/**
 	 * Returns a reader for the subpartition with the given index.
@@ -82,12 +80,12 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
 	ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException;
 
 	/**
-	 * Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers} in all subpartitions.
+	 * Manually trigger the consumption of data from all subpartitions.
 	 */
 	void flushAll();
 
 	/**
-	 * Manually trigger consumption from enqueued {@link BufferConsumer BufferConsumers} in one specified subpartition.
+	 * Manually trigger the consumption of data from the given subpartitions.
 	 */
 	void flush(int subpartitionIndex);
 
@@ -108,4 +106,19 @@ public interface ResultPartitionWriter extends AutoCloseable, AvailabilityProvid
 	 * <p>Closing of partition is still needed afterwards.
 	 */
 	void finish() throws IOException;
+
+	boolean isFinished();
+
+	/**
+	 * Releases the partition writer which releases the produced data and no reader can consume the
+	 * partition any more.
+	 */
+	void release(Throwable cause);
+
+	boolean isReleased();
+
+	/**
+	 * Closes the partition writer which releases the allocated resource, for example the buffer pool.
+	 */
+	void close() throws Exception;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index 437d0a0..bd9e8ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -19,16 +19,23 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkElementIndex;
@@ -49,6 +56,14 @@ public class BufferWritingResultPartition extends ResultPartition {
 	/** The subpartitions of this partition. At least one. */
 	protected final ResultSubpartition[] subpartitions;
 
+	/** For non-broadcast mode, each subpartition maintains a separate BufferBuilder which might be null. */
+	private final BufferBuilder[] subpartitionBufferBuilders;
+
+	/** For broadcast mode, a single BufferBuilder is shared by all subpartitions. */
+	private BufferBuilder broadcastBufferBuilder;
+
+	private Meter idleTimeMsPerSecond = new MeterView(new SimpleCounter());
+
 	public BufferWritingResultPartition(
 		String owningTaskName,
 		int partitionIndex,
@@ -72,6 +87,7 @@ public class BufferWritingResultPartition extends ResultPartition {
 			bufferPoolFactory);
 
 		this.subpartitions = checkNotNull(subpartitions);
+		this.subpartitionBufferBuilders = new BufferBuilder[subpartitions.length];
 	}
 
 	public int getNumberOfQueuedBuffers() {
@@ -90,46 +106,58 @@ public class BufferWritingResultPartition extends ResultPartition {
 	}
 
 	@Override
-	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		checkInProduceState();
-
-		return bufferPool.requestBufferBuilderBlocking(targetChannel);
+	public void flushAll() {
+		for (ResultSubpartition subpartition : subpartitions) {
+			subpartition.flush();
+		}
 	}
 
 	@Override
-	public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-		return bufferPool.requestBufferBuilder(targetChannel);
+	public void flush(int targetSubpartition) {
+		subpartitions[targetSubpartition].flush();
 	}
 
-	@Override
-	public boolean addBufferConsumer(
-			BufferConsumer bufferConsumer,
-			int subpartitionIndex) throws IOException {
-		checkNotNull(bufferConsumer);
+	public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
+		do {
+			final BufferBuilder bufferBuilder = getSubpartitionBufferBuilder(targetSubpartition);
+			bufferBuilder.appendAndCommit(record);
 
-		ResultSubpartition subpartition;
-		try {
-			checkInProduceState();
-			subpartition = subpartitions[subpartitionIndex];
-		}
-		catch (Exception ex) {
-			bufferConsumer.close();
-			throw ex;
-		}
+			if (bufferBuilder.isFull()) {
+				finishSubpartitionBufferBuilder(targetSubpartition);
+			}
+		} while (record.hasRemaining());
+	}
+
+	@Override
+	public void broadcastRecord(ByteBuffer record) throws IOException {
+		do {
+			final BufferBuilder bufferBuilder = getBroadcastBufferBuilder();
+			bufferBuilder.appendAndCommit(record);
 
-		return subpartition.add(bufferConsumer);
+			if (bufferBuilder.isFull()) {
+				finishBroadcastBufferBuilder();
+			}
+		} while (record.hasRemaining());
 	}
 
 	@Override
-	public void flushAll() {
-		for (ResultSubpartition subpartition : subpartitions) {
-			subpartition.flush();
+	public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
+		checkInProduceState();
+		finishBroadcastBufferBuilder();
+		finishSubpartitionBufferBuilders();
+
+		try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event, isPriorityEvent)) {
+			for (ResultSubpartition subpartition : subpartitions) {
+				// Retain the buffer so that it can be recycled by each channel of targetPartition
+				subpartition.add(eventBufferConsumer.copy());
+			}
 		}
 	}
 
 	@Override
-	public void flush(int targetSubpartition) {
-		subpartitions[targetSubpartition].flush();
+	public void setMetricGroup(TaskIOMetricGroup metrics) {
+		super.setMetricGroup(metrics);
+		idleTimeMsPerSecond = metrics.getIdleTimeMsPerSecond();
 	}
 
 	@Override
@@ -149,9 +177,13 @@ public class BufferWritingResultPartition extends ResultPartition {
 
 	@Override
 	public void finish() throws IOException {
+		finishBroadcastBufferBuilder();
+		finishSubpartitionBufferBuilders();
+
 		for (ResultSubpartition subpartition : subpartitions) {
 			subpartition.finish();
 		}
+
 		super.finish();
 	}
 
@@ -169,6 +201,101 @@ public class BufferWritingResultPartition extends ResultPartition {
 		}
 	}
 
+	private BufferBuilder getSubpartitionBufferBuilder(int targetSubpartition) throws IOException {
+		final BufferBuilder bufferBuilder = subpartitionBufferBuilders[targetSubpartition];
+		if (bufferBuilder != null) {
+			return bufferBuilder;
+		}
+
+		return getNewSubpartitionBufferBuilder(targetSubpartition);
+	}
+
+	private BufferBuilder getNewSubpartitionBufferBuilder(int targetSubpartition) throws IOException {
+		checkInProduceState();
+		ensureUnicastMode();
+
+		final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(targetSubpartition);
+		subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumer());
+		subpartitionBufferBuilders[targetSubpartition] = bufferBuilder;
+		return bufferBuilder;
+	}
+
+	private BufferBuilder getBroadcastBufferBuilder() throws IOException {
+		if (broadcastBufferBuilder != null) {
+			return broadcastBufferBuilder;
+		}
+
+		return getNewBroadcastBufferBuilder();
+	}
+
+	private BufferBuilder getNewBroadcastBufferBuilder() throws IOException {
+		checkInProduceState();
+		ensureBroadcastMode();
+
+		final BufferBuilder bufferBuilder = requestNewBufferBuilderFromPool(0);
+		broadcastBufferBuilder = bufferBuilder;
+
+		try (final BufferConsumer consumer = bufferBuilder.createBufferConsumer()) {
+			for (ResultSubpartition subpartition : subpartitions) {
+				subpartition.add(consumer.copy());
+			}
+		}
+
+		return bufferBuilder;
+	}
+
+	private BufferBuilder requestNewBufferBuilderFromPool(int targetSubpartition) throws IOException {
+		BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetSubpartition);
+		if (bufferBuilder != null) {
+			return bufferBuilder;
+		}
+
+		final long start = System.currentTimeMillis();
+		try {
+			bufferBuilder = bufferPool.requestBufferBuilderBlocking(targetSubpartition);
+			idleTimeMsPerSecond.markEvent(System.currentTimeMillis() - start);
+			return bufferBuilder;
+		} catch (InterruptedException e) {
+			throw new IOException("Interrupted while waiting for buffer");
+		}
+	}
+
+	private void finishSubpartitionBufferBuilder(int targetSubpartition) {
+		final BufferBuilder bufferBuilder = subpartitionBufferBuilders[targetSubpartition];
+		if (bufferBuilder != null) {
+			numBytesOut.inc(bufferBuilder.finish());
+			numBuffersOut.inc();
+			subpartitionBufferBuilders[targetSubpartition] = null;
+		}
+	}
+
+	private void finishSubpartitionBufferBuilders() {
+		for (int channelIndex = 0; channelIndex < numSubpartitions; channelIndex++) {
+			finishSubpartitionBufferBuilder(channelIndex);
+		}
+	}
+
+	private void finishBroadcastBufferBuilder() {
+		if (broadcastBufferBuilder != null) {
+			numBytesOut.inc(broadcastBufferBuilder.finish() * numSubpartitions);
+			numBuffersOut.inc(numSubpartitions);
+			broadcastBufferBuilder = null;
+		}
+	}
+
+	private void ensureUnicastMode() {
+		finishBroadcastBufferBuilder();
+	}
+
+	private void ensureBroadcastMode() {
+		finishSubpartitionBufferBuilders();
+	}
+
+	@VisibleForTesting
+	public Meter getIdleTimeMsPerSecond() {
+		return idleTimeMsPerSecond;
+	}
+
 	@VisibleForTesting
 	public ResultSubpartition[] getAllPartitions() {
 		return subpartitions;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 02c91c5..bd81c19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -27,6 +29,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.FunctionWithException;
@@ -103,6 +106,10 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 	@Nullable
 	protected final BufferCompressor bufferCompressor;
 
+	protected Counter numBytesOut = new SimpleCounter();
+
+	protected Counter numBuffersOut = new SimpleCounter();
+
 	public ResultPartition(
 		String owningTaskName,
 		int partitionIndex,
@@ -202,13 +209,15 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 		isFinished = true;
 	}
 
+	@Override
+	public boolean isFinished() {
+		return isFinished;
+	}
+
 	public void release() {
 		release(null);
 	}
 
-	/**
-	 * Releases the result partition.
-	 */
 	public void release(Throwable cause) {
 		if (isReleased.compareAndSet(false, true)) {
 			LOG.debug("{}: Releasing {}.", owningTaskName, this);
@@ -248,6 +257,12 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
 		return numTargetKeyGroups;
 	}
 
+	@Override
+	public void setMetricGroup(TaskIOMetricGroup metrics) {
+		numBytesOut = metrics.getNumBytesOutCounter();
+		numBuffersOut = metrics.getNumBuffersOutCounter();
+	}
+
 	/**
 	 * Releases buffers held by this result partition.
 	 *
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
index d8cd9ec..a2dd4c9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/shipping/OutputCollector.java
@@ -68,9 +68,6 @@ public class OutputCollector<T> implements Collector<T> {
 			catch (IOException e) {
 				throw new RuntimeException("Emitting the record caused an I/O exception: " + e.getMessage(), e);
 			}
-			catch (InterruptedException e) {
-				throw new RuntimeException("Emitting the record was interrupted: " + e.getMessage(), e);
-			}
 		}
 		else {
 			throw new NullPointerException("The system does not support records that are null. "
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
index 3451939..3eca089 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ConsumableNotifyingResultPartitionWriterDecorator.java
@@ -21,17 +21,18 @@ package org.apache.flink.runtime.taskmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.CheckpointedResultPartition;
 import org.apache.flink.runtime.io.network.partition.CheckpointedResultSubpartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
@@ -69,16 +70,6 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
 	}
 
 	@Override
-	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		return partitionWriter.getBufferBuilder(targetChannel);
-	}
-
-	@Override
-	public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-		return partitionWriter.tryGetBufferBuilder(targetChannel);
-	}
-
-	@Override
 	public ResultPartitionID getPartitionId() {
 		return partitionWriter.getPartitionId();
 	}
@@ -99,18 +90,34 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
 	}
 
 	@Override
-	public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
-		return partitionWriter.createSubpartitionView(index, availabilityListener);
+	public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
+		partitionWriter.emitRecord(record, targetSubpartition);
+
+		notifyPipelinedConsumers();
 	}
 
 	@Override
-	public boolean addBufferConsumer(BufferConsumer bufferConsumer,	int subpartitionIndex) throws IOException {
-		boolean success = partitionWriter.addBufferConsumer(bufferConsumer, subpartitionIndex);
-		if (success) {
-			notifyPipelinedConsumers();
-		}
+	public void broadcastRecord(ByteBuffer record) throws IOException {
+		partitionWriter.broadcastRecord(record);
+
+		notifyPipelinedConsumers();
+	}
+
+	@Override
+	public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
+		partitionWriter.broadcastEvent(event, isPriorityEvent);
+
+		notifyPipelinedConsumers();
+	}
 
-		return success;
+	@Override
+	public void setMetricGroup(TaskIOMetricGroup metrics) {
+		partitionWriter.setMetricGroup(metrics);
+	}
+
+	@Override
+	public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
+		return partitionWriter.createSubpartitionView(index, availabilityListener);
 	}
 
 	@Override
@@ -131,6 +138,21 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
 	}
 
 	@Override
+	public boolean isFinished() {
+		return partitionWriter.isFinished();
+	}
+
+	@Override
+	public void release(Throwable cause) {
+		partitionWriter.release(cause);
+	}
+
+	@Override
+	public boolean isReleased() {
+		return partitionWriter.isReleased();
+	}
+
+	@Override
 	public void fail(Throwable throwable) {
 		partitionWriter.fail(throwable);
 	}
@@ -152,7 +174,7 @@ public class ConsumableNotifyingResultPartitionWriterDecorator implements Result
 	 * this will trigger the deployment of consuming tasks after the first buffer has been added.
 	 */
 	private void notifyPipelinedConsumers() {
-		if (!hasNotifiedPipelinedConsumers) {
+		if (!hasNotifiedPipelinedConsumers && !partitionWriter.isReleased()) {
 			partitionConsumableNotifier.notifyPartitionConsumable(jobId, partitionWriter.getPartitionId(), taskActions);
 
 			hasNotifiedPipelinedConsumers = true;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index e35b311..9c748db 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -116,17 +118,16 @@ public class SpanningRecordSerializationTest extends TestLogger {
 	// -----------------------------------------------------------------------------------------------------------------
 
 	private void testSerializationRoundTrip(Iterable<SerializationTestType> records, int segmentSize) throws Exception {
-		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 		RecordDeserializer<SerializationTestType> deserializer =
 			new SpillingAdaptiveSpanningRecordDeserializer<>(
 				new String[]{ tempFolder.getRoot().getAbsolutePath() });
 
-		testSerializationRoundTrip(records, segmentSize, serializer, deserializer);
+		testSerializationRoundTrip(records, segmentSize, deserializer);
 	}
 
 	/**
-	 * Iterates over the provided records and tests whether {@link SpanningRecordSerializer} and {@link RecordDeserializer}
-	 * interact as expected.
+	 * Iterates over the provided records and tests whether {@link RecordWriter#serializeRecord} and
+	 * {@link RecordDeserializer} interact as expected.
 	 *
 	 * <p>Only a single {@link MemorySegment} will be allocated.
 	 *
@@ -136,14 +137,14 @@ public class SpanningRecordSerializationTest extends TestLogger {
 	private static void testSerializationRoundTrip(
 			Iterable<SerializationTestType> records,
 			int segmentSize,
-			RecordSerializer<SerializationTestType> serializer,
 			RecordDeserializer<SerializationTestType> deserializer)
 		throws Exception {
+		final DataOutputSerializer serializer = new DataOutputSerializer(128);
 		final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<>();
 
 		// -------------------------------------------------------------------------------------------------------------
 
-		BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializer, segmentSize);
+		BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializer.wrapAsByteBuffer(), segmentSize);
 
 		int numRecords = 0;
 		for (SerializationTestType record : records) {
@@ -153,18 +154,21 @@ public class SpanningRecordSerializationTest extends TestLogger {
 			numRecords++;
 
 			// serialize record
-			serializer.serializeRecord(record);
-			if (serializer.copyToBufferBuilder(serializationResult.getBufferBuilder()).isFullBuffer()) {
+			serializer.clear();
+			ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, record);
+			serializationResult.getBufferBuilder().appendAndCommit(serializedRecord);
+			if (serializationResult.getBufferBuilder().isFull()) {
 				// buffer is full => start deserializing
 				deserializer.setNextBuffer(serializationResult.buildBuffer());
 
 				numRecords -= DeserializationUtils.deserializeRecords(serializedRecords, deserializer);
 
 				// move buffers as long as necessary (for long records)
-				while ((serializationResult = setNextBufferForSerializer(serializer, segmentSize)).isFullBuffer()) {
+				while ((serializationResult = setNextBufferForSerializer(serializedRecord, segmentSize)).isFullBuffer()) {
 					deserializer.setNextBuffer(serializationResult.buildBuffer());
 				}
 			}
+			Assert.assertFalse(serializedRecord.hasRemaining());
 		}
 
 		// deserialize left over records
@@ -183,18 +187,16 @@ public class SpanningRecordSerializationTest extends TestLogger {
 
 		// assert that all records have been serialized and deserialized
 		Assert.assertEquals(0, numRecords);
-		Assert.assertFalse(serializer.hasSerializedData());
 		Assert.assertFalse(deserializer.hasUnfinishedData());
 	}
 
 	@Test
 	public void testSmallRecordUnconsumedBuffer() throws Exception {
-		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 		RecordDeserializer<SerializationTestType> deserializer =
 			new SpillingAdaptiveSpanningRecordDeserializer<>(
 				new String[]{tempFolder.getRoot().getAbsolutePath()});
 
-		testUnconsumedBuffer(serializer, deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1024);
+		testUnconsumedBuffer(deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1024);
 	}
 
 	/**
@@ -202,33 +204,29 @@ public class SpanningRecordSerializationTest extends TestLogger {
 	 */
 	@Test
 	public void testSpanningRecordUnconsumedBuffer() throws Exception {
-		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 		RecordDeserializer<SerializationTestType> deserializer =
 			new SpillingAdaptiveSpanningRecordDeserializer<>(
 				new String[]{tempFolder.getRoot().getAbsolutePath()});
 
-		testUnconsumedBuffer(serializer, deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1);
+		testUnconsumedBuffer(deserializer, Util.randomRecord(SerializationTestTypeFactory.INT), 1);
 	}
 
 	@Test
 	public void testLargeSpanningRecordUnconsumedBuffer() throws Exception {
-		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 		RecordDeserializer<SerializationTestType> deserializer =
 			new SpillingAdaptiveSpanningRecordDeserializer<>(
 				new String[]{tempFolder.getRoot().getAbsolutePath()});
 
-		testUnconsumedBuffer(serializer, deserializer, Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1);
+		testUnconsumedBuffer(deserializer, Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY), 1);
 	}
 
 	@Test
 	public void testLargeSpanningRecordUnconsumedBufferWithLeftOverBytes() throws Exception {
-		RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
 		RecordDeserializer<SerializationTestType> deserializer =
 			new SpillingAdaptiveSpanningRecordDeserializer<>(
 				new String[]{tempFolder.getRoot().getAbsolutePath()});
 
 		testUnconsumedBuffer(
-			serializer,
 			deserializer,
 			Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY),
 			1,
@@ -237,7 +235,6 @@ public class SpanningRecordSerializationTest extends TestLogger {
 		deserializer.clear();
 
 		testUnconsumedBuffer(
-			serializer,
 			deserializer,
 			Util.randomRecord(SerializationTestTypeFactory.BYTE_ARRAY),
 			1,
@@ -245,17 +242,18 @@ public class SpanningRecordSerializationTest extends TestLogger {
 	}
 
 	public void testUnconsumedBuffer(
-			RecordSerializer<SerializationTestType> serializer,
 			RecordDeserializer<SerializationTestType> deserializer,
 			SerializationTestType record,
 			int segmentSize,
 			byte... leftOverBytes) throws Exception {
 		try (ByteArrayOutputStream unconsumedBytes = new ByteArrayOutputStream()) {
-			serializer.serializeRecord(record);
+			DataOutputSerializer serializer = new DataOutputSerializer(128);
+			ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, record);
 
-			BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializer, segmentSize);
+			BufferAndSerializerResult serializationResult = setNextBufferForSerializer(serializedRecord, segmentSize);
 
-			if (serializer.copyToBufferBuilder(serializationResult.getBufferBuilder()).isFullBuffer()) {
+			serializationResult.getBufferBuilder().appendAndCommit(serializedRecord);
+			if (serializationResult.getBufferBuilder().isFull()) {
 				// buffer is full => start deserializing
 				Buffer buffer = serializationResult.buildBuffer();
 				writeBuffer(buffer.readOnlySlice().getNioBufferReadable(), unconsumedBytes);
@@ -265,7 +263,7 @@ public class SpanningRecordSerializationTest extends TestLogger {
 				deserializer.getNextRecord(record.getClass().newInstance());
 
 				// move buffers as long as necessary (for long records)
-				while ((serializationResult = setNextBufferForSerializer(serializer, segmentSize)).isFullBuffer()) {
+				while ((serializationResult = setNextBufferForSerializer(serializedRecord, segmentSize)).isFullBuffer()) {
 					buffer = serializationResult.buildBuffer();
 
 					if (serializationResult.isFullRecord()) {
@@ -310,7 +308,7 @@ public class SpanningRecordSerializationTest extends TestLogger {
 	}
 
 	private static BufferAndSerializerResult setNextBufferForSerializer(
-			RecordSerializer<SerializationTestType> serializer,
+			ByteBuffer serializedRecord,
 			int segmentSize) throws IOException {
 		// create a bufferBuilder with some random starting offset to properly test handling buffer slices in the
 		// deserialization code.
@@ -319,24 +317,29 @@ public class SpanningRecordSerializationTest extends TestLogger {
 		BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
 		bufferConsumer.build().recycleBuffer();
 
+		bufferBuilder.appendAndCommit(serializedRecord);
 		return new BufferAndSerializerResult(
 			bufferBuilder,
 			bufferConsumer,
-			serializer.copyToBufferBuilder(bufferBuilder));
+			bufferBuilder.isFull(),
+			!serializedRecord.hasRemaining());
 	}
 
 	private static class BufferAndSerializerResult {
 		private final BufferBuilder bufferBuilder;
 		private final BufferConsumer bufferConsumer;
-		private final RecordSerializer.SerializationResult serializationResult;
+		private final boolean isFullBuffer;
+		private final boolean isFullRecord;
 
 		public BufferAndSerializerResult(
 				BufferBuilder bufferBuilder,
 				BufferConsumer bufferConsumer,
-				RecordSerializer.SerializationResult serializationResult) {
+				boolean isFullBuffer,
+				boolean isFullRecord) {
 			this.bufferBuilder = bufferBuilder;
 			this.bufferConsumer = bufferConsumer;
-			this.serializationResult = serializationResult;
+			this.isFullBuffer = isFullBuffer;
+			this.isFullRecord = isFullRecord;
 		}
 
 		public BufferBuilder getBufferBuilder() {
@@ -348,11 +351,11 @@ public class SpanningRecordSerializationTest extends TestLogger {
 		}
 
 		public boolean isFullBuffer() {
-			return serializationResult.isFullBuffer();
+			return isFullBuffer;
 		}
 
 		public boolean isFullRecord() {
-			return serializationResult.isFullRecord();
+			return isFullRecord;
 		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
deleted file mode 100644
index e5f5dfc..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api.serialization;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.testutils.serialization.types.SerializationTestType;
-import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
-import org.apache.flink.testutils.serialization.types.Util;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Random;
-
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-
-/**
- * Tests for the {@link SpanningRecordSerializer}.
- */
-public class SpanningRecordSerializerTest {
-
-	@Test
-	public void testHasSerializedData() throws IOException {
-		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
-		final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT);
-
-		Assert.assertFalse(serializer.hasSerializedData());
-
-		serializer.serializeRecord(randomIntRecord);
-		Assert.assertTrue(serializer.hasSerializedData());
-
-		final BufferBuilder bufferBuilder1 = createBufferBuilder(16);
-		serializer.copyToBufferBuilder(bufferBuilder1);
-		Assert.assertFalse(serializer.hasSerializedData());
-
-		final BufferBuilder bufferBuilder2 = createBufferBuilder(8);
-		serializer.reset();
-		serializer.copyToBufferBuilder(bufferBuilder2);
-		Assert.assertFalse(serializer.hasSerializedData());
-
-		serializer.reset();
-		serializer.copyToBufferBuilder(bufferBuilder2);
-		// Buffer builder full!
-		Assert.assertTrue(serializer.hasSerializedData());
-	}
-
-	@Test
-	public void testEmptyRecords() throws IOException {
-		final int segmentSize = 11;
-
-		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
-		final BufferBuilder bufferBuilder1 = createBufferBuilder(segmentSize);
-
-		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD,
-			serializer.copyToBufferBuilder(bufferBuilder1));
-
-		SerializationTestType emptyRecord = new SerializationTestType() {
-			@Override
-			public SerializationTestType getRandom(Random rnd) {
-				throw new UnsupportedOperationException();
-			}
-
-			@Override
-			public int length() {
-				throw new UnsupportedOperationException();
-			}
-
-			@Override
-			public void write(DataOutputView out) {}
-
-			@Override
-			public void read(DataInputView in) {}
-
-			@Override
-			public int hashCode() {
-				throw new UnsupportedOperationException();
-			}
-
-			@Override
-			public boolean equals(Object obj) {
-				throw new UnsupportedOperationException();
-			}
-		};
-
-		serializer.serializeRecord(emptyRecord);
-		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.copyToBufferBuilder(bufferBuilder1));
-
-		serializer.reset();
-		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.copyToBufferBuilder(bufferBuilder1));
-
-		serializer.reset();
-		Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL,
-			serializer.copyToBufferBuilder(bufferBuilder1));
-
-		final BufferBuilder bufferBuilder2 = createBufferBuilder(segmentSize);
-		Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD,
-			serializer.copyToBufferBuilder(bufferBuilder2));
-	}
-
-	@Test
-	public void testIntRecordsSpanningMultipleSegments() throws Exception {
-		final int segmentSize = 1;
-		final int numValues = 10;
-
-		test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
-	}
-
-	@Test
-	public void testIntRecordsWithAlignedSegments() throws Exception {
-		final int segmentSize = 64;
-		final int numValues = 64;
-
-		test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
-	}
-
-	@Test
-	public void testIntRecordsWithUnalignedSegments() throws Exception {
-		final int segmentSize = 31;
-		final int numValues = 248; // least common multiple => last record should align
-
-		test(Util.randomRecords(numValues, SerializationTestTypeFactory.INT), segmentSize);
-	}
-
-	@Test
-	public void testRandomRecords() throws Exception {
-		final int segmentSize = 127;
-		final int numValues = 100000;
-
-		test(Util.randomRecords(numValues), segmentSize);
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Iterates over the provided records and tests whether the {@link SpanningRecordSerializer} returns the expected
-	 * {@link RecordSerializer.SerializationResult} values.
-	 *
-	 * <p>Only a single {@link MemorySegment} will be allocated.
-	 *
-	 * @param records records to test
-	 * @param segmentSize size for the {@link MemorySegment}
-	 */
-	private void test(Util.MockRecords records, int segmentSize) throws Exception {
-		final int serializationOverhead = 4; // length encoding
-
-		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<>();
-
-		// -------------------------------------------------------------------------------------------------------------
-
-		BufferBuilder bufferBuilder = createBufferBuilder(segmentSize);
-		int numBytes = 0;
-		for (SerializationTestType record : records) {
-			serializer.serializeRecord(record);
-			RecordSerializer.SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
-			numBytes += record.length() + serializationOverhead;
-
-			if (numBytes < segmentSize) {
-				Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
-			} else if (numBytes == segmentSize) {
-				Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result);
-				bufferBuilder = createBufferBuilder(segmentSize);
-				numBytes = 0;
-			} else {
-				Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
-
-				while (result.isFullBuffer()) {
-					numBytes -= segmentSize;
-					bufferBuilder = createBufferBuilder(segmentSize);
-					result = serializer.copyToBufferBuilder(bufferBuilder);
-				}
-
-				Assert.assertTrue(result.isFullRecord());
-			}
-		}
-	}
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 84aa17e..66eb8f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -18,79 +18,45 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
 
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.io.IOException;
-import java.util.ArrayDeque;
+import java.nio.ByteBuffer;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
  * {@link ResultPartitionWriter} that collects output on the List.
  */
 @ThreadSafe
 public abstract class AbstractCollectingResultPartitionWriter extends MockResultPartitionWriter {
-	private final BufferProvider bufferProvider;
-	private final ArrayDeque<BufferConsumer> bufferConsumers = new ArrayDeque<>();
-
-	public AbstractCollectingResultPartitionWriter(BufferProvider bufferProvider) {
-		this.bufferProvider = checkNotNull(bufferProvider);
-	}
-
-	@Override
-	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		return bufferProvider.requestBufferBuilderBlocking(targetChannel);
-	}
 
 	@Override
-	public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-		return bufferProvider.requestBufferBuilder(targetChannel);
+	public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
+		checkArgument(targetSubpartition < getNumberOfSubpartitions());
+		deserializeRecord(record);
 	}
 
 	@Override
-	public synchronized boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
-		checkState(targetChannel < getNumberOfSubpartitions());
-		bufferConsumers.add(bufferConsumer);
-		processBufferConsumers();
-		return true;
+	public void broadcastRecord(ByteBuffer record) throws IOException {
+		deserializeRecord(record);
 	}
 
-	private void processBufferConsumers() throws IOException {
-		while (!bufferConsumers.isEmpty()) {
-			BufferConsumer bufferConsumer = bufferConsumers.peek();
-			Buffer buffer = bufferConsumer.build();
-			try {
-				deserializeBuffer(buffer);
-				if (!bufferConsumer.isFinished()) {
-					break;
-				}
-				bufferConsumers.pop().close();
-			}
-			finally {
-				buffer.recycleBuffer();
-			}
-		}
-	}
+	private void deserializeRecord(ByteBuffer serializedRecord) throws IOException {
+		checkArgument(serializedRecord.hasArray());
 
-	@Override
-	public synchronized void flushAll() {
-		try {
-			processBufferConsumers();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	@Override
-	public void flush(int subpartitionIndex) {
-		flushAll();
+		MemorySegment segment = MemorySegmentFactory.wrap(serializedRecord.array());
+		NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE);
+		buffer.setSize(serializedRecord.remaining());
+		deserializeBuffer(buffer);
+		buffer.recycleBuffer();
 	}
 
 	protected abstract void deserializeBuffer(Buffer buffer) throws IOException;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java
index dccfd1d..d318961 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriterTest.java
@@ -22,8 +22,10 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.testutils.serialization.types.IntType;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
@@ -31,12 +33,12 @@ import org.apache.flink.testutils.serialization.types.Util;
 
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 
 import static org.junit.Assert.assertEquals;
 
@@ -56,19 +58,12 @@ public class BroadcastRecordWriterTest extends RecordWriterTest {
 	 */
 	@Test
 	public void testBroadcastMixedRandomEmitRecord() throws Exception {
-		final int numberOfChannels = 4;
+		final int numberOfChannels = 8;
 		final int numberOfRecords = 8;
 		final int bufferSize = 32;
 
-		@SuppressWarnings("unchecked")
-		final Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
-		for (int i = 0; i < numberOfChannels; i++) {
-			queues[i] = new ArrayDeque<>();
-		}
-
-		final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-		final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
-		final BroadcastRecordWriter<SerializationTestType> writer = new BroadcastRecordWriter<>(partitionWriter, 0, "test");
+		final ResultPartition partition = createResultPartition(bufferSize, numberOfChannels);
+		final BroadcastRecordWriter<SerializationTestType> writer = new BroadcastRecordWriter<>(partition, -1, "test");
 		final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
 			new String[]{ tempFolder.getRoot().getAbsolutePath() });
 
@@ -84,7 +79,7 @@ public class BroadcastRecordWriterTest extends RecordWriterTest {
 		int index = 0;
 		for (SerializationTestType record : records) {
 			int randomChannel = index++ % numberOfChannels;
-			writer.randomEmit(record, randomChannel);
+			writer.emit(record, randomChannel);
 			serializedRecords.get(randomChannel).add(record);
 
 			writer.broadcastEmit(record);
@@ -93,23 +88,23 @@ public class BroadcastRecordWriterTest extends RecordWriterTest {
 			}
 		}
 
-		final int numberOfCreatedBuffers = bufferProvider.getNumberOfCreatedBuffers();
+		final int numberOfCreatedBuffers = partition.getBufferPool().bestEffortGetNumOfUsedBuffers();
 		// verify the expected number of requested buffers, and it would always request a new buffer while random emitting
-		assertEquals(numberOfRecords, numberOfCreatedBuffers);
+		assertEquals(2 * numberOfRecords, numberOfCreatedBuffers);
 
 		for (int i = 0; i < numberOfChannels; i++) {
 			// every channel would queue the number of above crated buffers
-			assertEquals(numberOfRecords, queues[i].size());
+			assertEquals(numberOfRecords + 1, partition.getNumberOfQueuedBuffers(i));
 
 			final int excessRandomRecords = i < numberOfRecords % numberOfChannels ? 1 : 0;
 			final int numberOfRandomRecords = numberOfRecords / numberOfChannels + excessRandomRecords;
 			final int numberOfTotalRecords = numberOfRecords + numberOfRandomRecords;
 			// verify the data correctness in every channel queue
 			verifyDeserializationResults(
-				queues[i],
+				partition.createSubpartitionView(i, new NoOpBufferAvailablityListener()),
 				deserializer,
 				serializedRecords.get(i),
-				numberOfCreatedBuffers,
+				numberOfRecords + 1,
 				numberOfTotalRecords);
 		}
 	}
@@ -121,45 +116,41 @@ public class BroadcastRecordWriterTest extends RecordWriterTest {
 	@Test
 	public void testRandomEmitAndBufferRecycling() throws Exception {
 		int recordSize = 8;
+		int numberOfChannels = 2;
 
-		final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(2, 2 * recordSize);
-		final KeepingPartitionWriter partitionWriter = new KeepingPartitionWriter(bufferProvider) {
-			@Override
-			public int getNumberOfSubpartitions() {
-				return 2;
-			}
-		};
-		final BroadcastRecordWriter<SerializationTestType> writer = new BroadcastRecordWriter<>(partitionWriter, 0, "test");
+		ResultPartition partition = createResultPartition(2 * recordSize, numberOfChannels);
+		BufferPool bufferPool = partition.getBufferPool();
+		BroadcastRecordWriter<SerializationTestType> writer = new BroadcastRecordWriter<>(partition, -1, "test");
 
 		// force materialization of both buffers for easier availability tests
-		List<Buffer> buffers = Arrays.asList(bufferProvider.requestBuffer(), bufferProvider.requestBuffer());
+		List<Buffer> buffers = Arrays.asList(bufferPool.requestBuffer(), bufferPool.requestBuffer());
 		buffers.forEach(Buffer::recycleBuffer);
-		assertEquals(2, bufferProvider.getNumberOfAvailableBuffers());
+		assertEquals(2, bufferPool.getNumberOfAvailableMemorySegments());
 
 		// fill first buffer
-		writer.randomEmit(new IntType(1), 0);
+		writer.broadcastEmit(new IntType(1));
 		writer.broadcastEmit(new IntType(2));
-		assertEquals(1, bufferProvider.getNumberOfAvailableBuffers());
+		assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
 
 		// simulate consumption of first buffer consumer; this should not free buffers
-		assertEquals(1, partitionWriter.getAddedBufferConsumers(0).size());
-		closeConsumer(partitionWriter, 0, 2 * recordSize);
-		assertEquals(1, bufferProvider.getNumberOfAvailableBuffers());
+		assertEquals(1, partition.getNumberOfQueuedBuffers(0));
+		ResultSubpartitionView view0 = partition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
+		closeConsumer(view0, 2 * recordSize);
+		assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
 
 		// use second buffer
-		writer.broadcastEmit(new IntType(3));
-		assertEquals(0, bufferProvider.getNumberOfAvailableBuffers());
+		writer.emit(new IntType(3), 0);
+		assertEquals(0, bufferPool.getNumberOfAvailableMemorySegments());
 
 		// fully free first buffer
-		assertEquals(2, partitionWriter.getAddedBufferConsumers(1).size());
-		closeConsumer(partitionWriter, 1, recordSize);
-		assertEquals(1, bufferProvider.getNumberOfAvailableBuffers());
+		assertEquals(1, partition.getNumberOfQueuedBuffers(1));
+		ResultSubpartitionView view1 = partition.createSubpartitionView(1, new NoOpBufferAvailablityListener());
+		closeConsumer(view1, 2 * recordSize);
+		assertEquals(1, bufferPool.getNumberOfAvailableMemorySegments());
 	}
 
-	public void closeConsumer(KeepingPartitionWriter partitionWriter, int subpartitionIndex, int expectedSize) {
-		BufferConsumer bufferConsumer = partitionWriter.getAddedBufferConsumers(subpartitionIndex).get(0);
-		Buffer buffer = bufferConsumer.build();
-		bufferConsumer.close();
+	public void closeConsumer(ResultSubpartitionView view, int expectedSize) throws IOException {
+		Buffer buffer = view.getNextBuffer().buffer();
 		assertEquals(expectedSize, buffer.getSize());
 		buffer.recycleBuffer();
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
index 1285c4e..4a9f7ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordCollectingResultPartitionWriter.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.api.writer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.types.Record;
 
 import java.io.IOException;
@@ -39,8 +38,7 @@ public class RecordCollectingResultPartitionWriter extends AbstractCollectingRes
 	private final RecordDeserializer<Record> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
 		new String[]{System.getProperty("java.io.tmpdir")});
 
-	public RecordCollectingResultPartitionWriter(List<Record> output, BufferProvider bufferProvider) {
-		super(bufferProvider);
+	public RecordCollectingResultPartitionWriter(List<Record> output) {
 		this.output = checkNotNull(output);
 	}
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
index 3d3073b..5540a53 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordOrEventCollectingResultPartitionWriter.java
@@ -20,11 +20,9 @@ package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 
@@ -44,35 +42,32 @@ public class RecordOrEventCollectingResultPartitionWriter<T> extends AbstractCol
 
 	public RecordOrEventCollectingResultPartitionWriter(
 			Collection<Object> output,
-			BufferProvider bufferProvider,
 			TypeSerializer<T> serializer) {
-		super(bufferProvider);
 		this.output = checkNotNull(output);
 		this.delegate = new NonReusingDeserializationDelegate<>(checkNotNull(serializer));
 	}
 
 	@Override
+	public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
+		output.add(event);
+	}
+
+	@Override
 	protected void deserializeBuffer(Buffer buffer) throws IOException {
-		if (buffer.isBuffer()) {
-			deserializer.setNextBuffer(buffer);
+		deserializer.setNextBuffer(buffer);
 
-			while (deserializer.hasUnfinishedData()) {
-				RecordDeserializer.DeserializationResult result =
-					deserializer.getNextRecord(delegate);
+		while (deserializer.hasUnfinishedData()) {
+			RecordDeserializer.DeserializationResult result =
+				deserializer.getNextRecord(delegate);
 
-				if (result.isFullRecord()) {
-					output.add(delegate.getInstance());
-				}
+			if (result.isFullRecord()) {
+				output.add(delegate.getInstance());
+			}
 
-				if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
-					|| result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
-					break;
-				}
+			if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
+				|| result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
+				break;
 			}
-		} else {
-			// is event
-			AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
-			output.add(event);
 		}
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
index adb059c..5a0ea8f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
@@ -20,28 +20,25 @@ package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
+import org.apache.flink.types.IntValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -51,16 +48,17 @@ import static org.junit.Assert.assertTrue;
  */
 public class RecordWriterDelegateTest extends TestLogger {
 
+	private static final int recordSize = 8;
+
 	private static final int numberOfBuffers = 10;
 
 	private static final int memorySegmentSize = 128;
 
-	private static final int numberOfSegmentsToRequest = 2;
-
 	private NetworkBufferPool globalPool;
 
 	@Before
 	public void setup() {
+		assertEquals("Illegal memory segment size,", 0, memorySegmentSize % recordSize);
 		globalPool = new NetworkBufferPool(numberOfBuffers, memorySegmentSize);
 	}
 
@@ -103,11 +101,11 @@ public class RecordWriterDelegateTest extends TestLogger {
 	@SuppressWarnings("unchecked")
 	public void testSingleRecordWriterBroadcastEvent() throws Exception {
 		// setup
-		final ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[] { new ArrayDeque(), new ArrayDeque() };
-		final RecordWriter recordWriter = createRecordWriter(queues);
+		final ResultPartition partition = RecordWriterTest.createResultPartition(memorySegmentSize, 2);
+		final RecordWriter recordWriter = new RecordWriterBuilder<>().build(partition);
 		final RecordWriterDelegate writerDelegate = new SingleRecordWriter(recordWriter);
 
-		verifyBroadcastEvent(writerDelegate, queues, 1);
+		verifyBroadcastEvent(writerDelegate, Collections.singletonList(partition));
 	}
 
 	@Test
@@ -116,14 +114,16 @@ public class RecordWriterDelegateTest extends TestLogger {
 		// setup
 		final int numRecordWriters = 2;
 		final List<RecordWriter> recordWriters = new ArrayList<>(numRecordWriters);
-		final ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[] { new ArrayDeque(), new ArrayDeque() };
+		final List<ResultPartition> partitions = new ArrayList<>(numRecordWriters);
 
 		for (int i = 0; i < numRecordWriters; i++) {
-			recordWriters.add(createRecordWriter(queues));
+			final ResultPartition partition = RecordWriterTest.createResultPartition(memorySegmentSize, 2);
+			partitions.add(partition);
+			recordWriters.add(new RecordWriterBuilder<>().build(partition));
 		}
 		final RecordWriterDelegate writerDelegate = new MultipleRecordWriters(recordWriters);
 
-		verifyBroadcastEvent(writerDelegate, queues, numRecordWriters);
+		verifyBroadcastEvent(writerDelegate, partitions);
 	}
 
 	private RecordWriter createRecordWriter(NetworkBufferPool globalPool) throws Exception {
@@ -136,14 +136,6 @@ public class RecordWriterDelegateTest extends TestLogger {
 		return new RecordWriterBuilder().build(partition);
 	}
 
-	private RecordWriter createRecordWriter(ArrayDeque<BufferConsumer>[] queues) {
-		final ResultPartitionWriter partition = new RecordWriterTest.CollectingPartitionWriter(
-			queues,
-			new TestPooledBufferProvider(1));
-
-		return new RecordWriterBuilder().build(partition);
-	}
-
 	private void verifyAvailability(RecordWriterDelegate writerDelegate) throws Exception {
 		// writer is available at the beginning
 		assertTrue(writerDelegate.isAvailable());
@@ -151,13 +143,14 @@ public class RecordWriterDelegateTest extends TestLogger {
 
 		// request one buffer from the local pool to make it unavailable
 		RecordWriter recordWriter = writerDelegate.getRecordWriter(0);
-		final BufferBuilder bufferBuilder = checkNotNull(recordWriter.getBufferBuilder(0));
+		for (int i = 0; i < memorySegmentSize / recordSize; ++i) {
+			recordWriter.emit(new IntValue(i));
+		}
 		assertFalse(writerDelegate.isAvailable());
 		CompletableFuture future = writerDelegate.getAvailableFuture();
 		assertFalse(future.isDone());
 
 		// recycle the buffer to make the local pool available again
-		BufferBuilderTestUtils.fillBufferBuilder(bufferBuilder, 1).finish();
 		ResultSubpartitionView readView = recordWriter.getTargetPartition().createSubpartitionView(0, new NoOpBufferAvailablityListener());
 		Buffer buffer = readView.getNextBuffer().buffer();
 
@@ -169,18 +162,18 @@ public class RecordWriterDelegateTest extends TestLogger {
 
 	private void verifyBroadcastEvent(
 			RecordWriterDelegate writerDelegate,
-			ArrayDeque<BufferConsumer>[] queues,
-			int numRecordWriters) throws Exception {
+			List<ResultPartition> partitions) throws Exception {
 
 		final CancelCheckpointMarker message = new CancelCheckpointMarker(1);
 		writerDelegate.broadcastEvent(message);
 
 		// verify the added messages in all the queues
-		for (int i = 0; i < queues.length; i++) {
-			assertEquals(numRecordWriters, queues[i].size());
+		for (ResultPartition partition : partitions) {
+			for (int i = 0; i < partition.getNumberOfSubpartitions(); i++) {
+				assertEquals(1, partition.getNumberOfQueuedBuffers(i));
 
-			for (int j = 0; j < numRecordWriters; j++) {
-				BufferOrEvent boe = RecordWriterTest.parseBuffer(queues[i].remove(), i);
+				ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
+				BufferOrEvent boe = RecordWriterTest.parseBuffer(view.getNextBuffer().buffer(), i);
 				assertTrue(boe.isEvent());
 				assertEquals(message, boe.getEvent());
 			}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 6cad1d3..a0956ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -27,34 +27,33 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.MockResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
 import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
 import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
 import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
+import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionTest;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.DeserializationUtils;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.operators.shipping.OutputEmitter;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
@@ -65,7 +64,6 @@ import org.apache.flink.testutils.serialization.types.Util;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.XORShiftRandom;
 
-import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -74,19 +72,13 @@ import org.junit.rules.TemporaryFolder;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Queue;
 import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
 
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
+import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -119,35 +111,27 @@ public class RecordWriterTest {
 		int numberOfChannels = 4;
 		int bufferSize = 32;
 
-		@SuppressWarnings("unchecked")
-		Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
-		for (int i = 0; i < numberOfChannels; i++) {
-			queues[i] = new ArrayDeque<>();
-		}
-
-		TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-
-		ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
-		RecordWriter<ByteArrayIO> writer = createRecordWriter(partitionWriter);
+		ResultPartition partition = createResultPartition(bufferSize, numberOfChannels);
+		RecordWriter<ByteArrayIO> writer = createRecordWriter(partition);
 		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 919192L, Integer.MAX_VALUE + 18828228L, CheckpointOptions.forCheckpointWithDefaultLocation());
 
 		// No records emitted yet, broadcast should not request a buffer
 		writer.broadcastEvent(barrier);
 
-		assertEquals(0, bufferProvider.getNumberOfCreatedBuffers());
+		assertEquals(0, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
 
 		for (int i = 0; i < numberOfChannels; i++) {
-			assertEquals(1, queues[i].size());
-			BufferOrEvent boe = parseBuffer(queues[i].remove(), i);
+			assertEquals(1, partition.getNumberOfQueuedBuffers(i));
+			ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
+			BufferOrEvent boe = parseBuffer(view.getNextBuffer().buffer(), i);
 			assertTrue(boe.isEvent());
 			assertEquals(barrier, boe.getEvent());
-			assertEquals(0, queues[i].size());
+			assertFalse(view.isAvailable(Integer.MAX_VALUE));
 		}
 	}
 
 	/**
-	 * Tests broadcasting events when records have been emitted. The emitted
-	 * records cover all three {@link SerializationResult} types.
+	 * Tests broadcasting events when records have been emitted.
 	 */
 	@Test
 	public void testBroadcastEventMixedRecords() throws Exception {
@@ -156,16 +140,8 @@ public class RecordWriterTest {
 		int bufferSize = 32;
 		int lenBytes = 4; // serialized length
 
-		@SuppressWarnings("unchecked")
-		Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
-		for (int i = 0; i < numberOfChannels; i++) {
-			queues[i] = new ArrayDeque<>();
-		}
-
-		TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-
-		ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
-		RecordWriter<ByteArrayIO> writer = createRecordWriter(partitionWriter);
+		ResultPartition partition = createResultPartition(bufferSize, numberOfChannels);
+		RecordWriter<ByteArrayIO> writer = createRecordWriter(partition);
 		CheckpointBarrier barrier = new CheckpointBarrier(Integer.MAX_VALUE + 1292L, Integer.MAX_VALUE + 199L, CheckpointOptions.forCheckpointWithDefaultLocation());
 
 		// Emit records on some channels first (requesting buffers), then
@@ -194,34 +170,43 @@ public class RecordWriterTest {
 		writer.broadcastEvent(barrier);
 
 		if (isBroadcastWriter) {
-			assertEquals(3, bufferProvider.getNumberOfCreatedBuffers());
+			assertEquals(3, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
 
 			for (int i = 0; i < numberOfChannels; i++) {
-				assertEquals(4, queues[i].size()); // 3 buffer + 1 event
+				assertEquals(4, partition.getNumberOfQueuedBuffers(i)); // 3 buffer + 1 event
 
+				ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
 				for (int j = 0; j < 3; j++) {
-					assertTrue(parseBuffer(queues[i].remove(), 0).isBuffer());
+					assertTrue(parseBuffer(view.getNextBuffer().buffer(), 0).isBuffer());
 				}
 
-				BufferOrEvent boe = parseBuffer(queues[i].remove(), i);
+				BufferOrEvent boe = parseBuffer(view.getNextBuffer().buffer(), i);
 				assertTrue(boe.isEvent());
 				assertEquals(barrier, boe.getEvent());
 			}
 		} else {
-			assertEquals(4, bufferProvider.getNumberOfCreatedBuffers());
+			assertEquals(4, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
+			ResultSubpartitionView[] views = new ResultSubpartitionView[4];
+
+			assertEquals(2, partition.getNumberOfQueuedBuffers(0)); // 1 buffer + 1 event
+			views[0] = partition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
+			assertTrue(parseBuffer(views[0].getNextBuffer().buffer(), 0).isBuffer());
 
-			assertEquals(2, queues[0].size()); // 1 buffer + 1 event
-			assertTrue(parseBuffer(queues[0].remove(), 0).isBuffer());
-			assertEquals(3, queues[1].size()); // 2 buffers + 1 event
-			assertTrue(parseBuffer(queues[1].remove(), 1).isBuffer());
-			assertTrue(parseBuffer(queues[1].remove(), 1).isBuffer());
-			assertEquals(2, queues[2].size()); // 1 buffer + 1 event
-			assertTrue(parseBuffer(queues[2].remove(), 2).isBuffer());
-			assertEquals(1, queues[3].size()); // 0 buffers + 1 event
+			assertEquals(3, partition.getNumberOfQueuedBuffers(1)); // 2 buffers + 1 event
+			views[1] = partition.createSubpartitionView(1, new NoOpBufferAvailablityListener());
+			assertTrue(parseBuffer(views[1].getNextBuffer().buffer(), 1).isBuffer());
+			assertTrue(parseBuffer(views[1].getNextBuffer().buffer(), 1).isBuffer());
+
+			assertEquals(2, partition.getNumberOfQueuedBuffers(2)); // 1 buffer + 1 event
+			views[2] = partition.createSubpartitionView(2, new NoOpBufferAvailablityListener());
+			assertTrue(parseBuffer(views[2].getNextBuffer().buffer(), 2).isBuffer());
+
+			views[3] = partition.createSubpartitionView(3, new NoOpBufferAvailablityListener());
+			assertEquals(1, partition.getNumberOfQueuedBuffers(3)); // 0 buffers + 1 event
 
 			// every queue's last element should be the event
 			for (int i = 0; i < numberOfChannels; i++) {
-				BufferOrEvent boe = parseBuffer(queues[i].remove(), i);
+				BufferOrEvent boe = parseBuffer(views[i].getNextBuffer().buffer(), i);
 				assertTrue(boe.isEvent());
 				assertEquals(barrier, boe.getEvent());
 			}
@@ -234,31 +219,28 @@ public class RecordWriterTest {
 	 */
 	@Test
 	public void testBroadcastEventBufferReferenceCounting() throws Exception {
+		int bufferSize = 32 * 1024;
+		int numSubpartitions = 2;
 
-		@SuppressWarnings("unchecked")
-		ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[] { new ArrayDeque(), new ArrayDeque() };
-
-		ResultPartitionWriter partition =
-			new CollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE));
+		ResultPartition partition = createResultPartition(bufferSize, numSubpartitions);
 		RecordWriter<?> writer = createRecordWriter(partition);
 
 		writer.broadcastEvent(EndOfPartitionEvent.INSTANCE);
 
-		// Verify added to all queues
-		assertEquals(1, queues[0].size());
-		assertEquals(1, queues[1].size());
-
 		// get references to buffer consumers (copies from the original event buffer consumer)
-		BufferConsumer bufferConsumer1 = queues[0].getFirst();
-		BufferConsumer bufferConsumer2 = queues[1].getFirst();
+		Buffer[] buffers = new Buffer[numSubpartitions];
 
 		// process all collected events (recycles the buffer)
-		for (int i = 0; i < queues.length; i++) {
-			assertTrue(parseBuffer(queues[i].remove(), i).isEvent());
+		for (int i = 0; i < numSubpartitions; i++) {
+			assertEquals(1, partition.getNumberOfQueuedBuffers(i));
+			ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
+			buffers[i] = view.getNextBuffer().buffer();
+			assertTrue(parseBuffer(buffers[i], i).isEvent());
 		}
 
-		assertTrue(bufferConsumer1.isRecycled());
-		assertTrue(bufferConsumer2.isRecycled());
+		for (int i = 0; i < numSubpartitions; ++i) {
+			assertTrue(buffers[i].isRecycled());
+		}
 	}
 
 	/**
@@ -289,15 +271,8 @@ public class RecordWriterTest {
 		final int numValues = 8;
 		final int serializationLength = 4;
 
-		@SuppressWarnings("unchecked")
-		final Queue<BufferConsumer>[] queues = new Queue[numberOfChannels];
-		for (int i = 0; i < numberOfChannels; i++) {
-			queues[i] = new ArrayDeque<>();
-		}
-
-		final TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(Integer.MAX_VALUE, bufferSize);
-		final ResultPartitionWriter partitionWriter = new CollectingPartitionWriter(queues, bufferProvider);
-		final RecordWriter<SerializationTestType> writer = createRecordWriter(partitionWriter);
+		final ResultPartition partition = createResultPartition(bufferSize, numberOfChannels);
+		final RecordWriter<SerializationTestType> writer = createRecordWriter(partition);
 		final RecordDeserializer<SerializationTestType> deserializer = new SpillingAdaptiveSpanningRecordDeserializer<>(
 			new String[]{ tempFolder.getRoot().getAbsolutePath() });
 
@@ -310,14 +285,15 @@ public class RecordWriterTest {
 
 		final int numRequiredBuffers = numValues / (bufferSize / (4 + serializationLength));
 		if (isBroadcastWriter) {
-			assertEquals(numRequiredBuffers, bufferProvider.getNumberOfCreatedBuffers());
+			assertEquals(numRequiredBuffers, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
 		} else {
-			assertEquals(numRequiredBuffers * numberOfChannels, bufferProvider.getNumberOfCreatedBuffers());
+			assertEquals(numRequiredBuffers * numberOfChannels, partition.getBufferPool().bestEffortGetNumOfUsedBuffers());
 		}
 
 		for (int i = 0; i < numberOfChannels; i++) {
-			assertEquals(numRequiredBuffers, queues[i].size());
-			verifyDeserializationResults(queues[i], deserializer, serializedRecords.clone(), numRequiredBuffers, numValues);
+			assertEquals(numRequiredBuffers, partition.getNumberOfQueuedBuffers(i));
+			ResultSubpartitionView view = partition.createSubpartitionView(i, new NoOpBufferAvailablityListener());
+			verifyDeserializationResults(view, deserializer, serializedRecords.clone(), numRequiredBuffers, numValues);
 		}
 	}
 
@@ -345,7 +321,7 @@ public class RecordWriterTest {
 			assertTrue(recordWriter.getAvailableFuture().isDone());
 
 			// request one buffer from the local pool to make it unavailable afterwards
-			final BufferBuilder bufferBuilder = resultPartition.getBufferBuilder(0);
+			final BufferBuilder bufferBuilder = localPool.requestBufferBuilder(0);
 			assertNotNull(bufferBuilder);
 			assertFalse(recordWriter.getAvailableFuture().isDone());
 
@@ -418,64 +394,8 @@ public class RecordWriterTest {
 		}
 	}
 
-	@Test
-	public void testIdleTime() throws IOException, InterruptedException {
-		// setup
-		final NetworkBufferPool globalPool = new NetworkBufferPool(10, 128);
-		final BufferPool localPool = globalPool.createBufferPool(1, 1, null, 1, Integer.MAX_VALUE);
-		final ResultPartitionWriter resultPartition = new ResultPartitionBuilder()
-			.setBufferPoolFactory(p -> localPool)
-			.build();
-		resultPartition.setup();
-		final ResultPartitionWriter partitionWrapper = new ConsumableNotifyingResultPartitionWriterDecorator(
-			new NoOpTaskActions(),
-			new JobID(),
-			resultPartition,
-			new NoOpResultPartitionConsumableNotifier());
-		final RecordWriter recordWriter = createRecordWriter(partitionWrapper);
-		BufferBuilder builder = recordWriter.requestNewBufferBuilder(0);
-		BufferBuilderTestUtils.fillBufferBuilder(builder, 1).finish();
-		ResultSubpartitionView readView = resultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
-		Buffer buffer = readView.getNextBuffer().buffer();
-
-		// idle time is zero when there is buffer available.
-		assertEquals(0, recordWriter.getIdleTimeMsPerSecond().getCount());
-
-		CountDownLatch syncLock = new CountDownLatch(1);
-		AtomicReference<BufferBuilder> asyncRequestResult = new AtomicReference<>();
-		final Thread requestThread = new Thread(new Runnable() {
-			@Override
-			public void run() {
-				try {
-					// notify that the request thread start to run.
-					syncLock.countDown();
-					// wait for buffer.
-					asyncRequestResult.set(recordWriter.requestNewBufferBuilder(0));
-				} catch (Exception e) {
-				}
-			}
-		});
-		requestThread.start();
-
-		// wait until request thread start to run.
-		syncLock.await();
-
-		Thread.sleep(10);
-
-		//recycle the buffer
-		buffer.recycleBuffer();
-		requestThread.join();
-
-		assertThat(recordWriter.getIdleTimeMsPerSecond().getCount(), Matchers.greaterThan(0L));
-		assertNotNull(asyncRequestResult.get());
-	}
-
 	private void verifyBroadcastBufferOrEventIndependence(boolean broadcastEvent) throws Exception {
-		@SuppressWarnings("unchecked")
-		ArrayDeque<BufferConsumer>[] queues = new ArrayDeque[]{new ArrayDeque(), new ArrayDeque()};
-
-		ResultPartitionWriter partition =
-			new CollectingPartitionWriter(queues, new TestPooledBufferProvider(Integer.MAX_VALUE));
+		ResultPartition partition = createResultPartition(4096, 2);
 		RecordWriter<IntValue> writer = createRecordWriter(partition);
 
 		if (broadcastEvent) {
@@ -485,12 +405,15 @@ public class RecordWriterTest {
 		}
 
 		// verify added to all queues
-		assertEquals(1, queues[0].size());
-		assertEquals(1, queues[1].size());
+		assertEquals(1, partition.getNumberOfQueuedBuffers(0));
+		assertEquals(1, partition.getNumberOfQueuedBuffers(1));
+
+		ResultSubpartitionView view0 = partition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
+		ResultSubpartitionView view1 = partition.createSubpartitionView(1, new NoOpBufferAvailablityListener());
 
 		// these two buffers may share the memory but not the indices!
-		Buffer buffer1 = buildSingleBuffer(queues[0].remove());
-		Buffer buffer2 = buildSingleBuffer(queues[1].remove());
+		Buffer buffer1 = view0.getNextBuffer().buffer();
+		Buffer buffer2 = view1.getNextBuffer().buffer();
 		assertEquals(0, buffer1.getReaderIndex());
 		assertEquals(0, buffer2.getReaderIndex());
 		buffer1.setReaderIndex(1);
@@ -498,18 +421,19 @@ public class RecordWriterTest {
 	}
 
 	protected void verifyDeserializationResults(
-			Queue<BufferConsumer> queue,
+			ResultSubpartitionView view,
 			RecordDeserializer<SerializationTestType> deserializer,
 			ArrayDeque<SerializationTestType> expectedRecords,
 			int numRequiredBuffers,
 			int numValues) throws Exception {
 		int assertRecords = 0;
 		for (int j = 0; j < numRequiredBuffers; j++) {
-			Buffer buffer = buildSingleBuffer(queue.remove());
+			Buffer buffer = view.getNextBuffer().buffer();
 			deserializer.setNextBuffer(buffer);
 
 			assertRecords += DeserializationUtils.deserializeRecords(expectedRecords, deserializer);
 		}
+		assertFalse(view.isAvailable(Integer.MAX_VALUE));
 		Assert.assertEquals(numValues, assertRecords);
 	}
 
@@ -526,51 +450,18 @@ public class RecordWriterTest {
 		}
 	}
 
+	public static ResultPartition createResultPartition(int bufferSize, int numSubpartitions) throws IOException {
+		NettyShuffleEnvironment env = new NettyShuffleEnvironmentBuilder().setBufferSize(bufferSize).build();
+		ResultPartition partition = createPartition(env, ResultPartitionType.PIPELINED, numSubpartitions);
+		partition.setup();
+		return partition;
+	}
+
 	// ---------------------------------------------------------------------------------------------
 	// Helpers
 	// ---------------------------------------------------------------------------------------------
 
-	/**
-	 * Partition writer that collects the added buffers/events in multiple queue.
-	 */
-	static class CollectingPartitionWriter extends MockResultPartitionWriter {
-		private final Queue<BufferConsumer>[] queues;
-		private final BufferProvider bufferProvider;
-
-		/**
-		 * Create the partition writer.
-		 *
-		 * @param queues one queue per outgoing channel
-		 * @param bufferProvider buffer provider
-		 */
-		CollectingPartitionWriter(Queue<BufferConsumer>[] queues, BufferProvider bufferProvider) {
-			this.queues = queues;
-			this.bufferProvider = bufferProvider;
-		}
-
-		@Override
-		public int getNumberOfSubpartitions() {
-			return queues.length;
-		}
-
-		@Override
-		public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-			return bufferProvider.requestBufferBuilderBlocking(targetChannel);
-		}
-
-		@Override
-		public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-			return bufferProvider.requestBufferBuilder(targetChannel);
-		}
-
-		@Override
-		public boolean addBufferConsumer(BufferConsumer buffer, int targetChannel) {
-			return queues[targetChannel].add(buffer);
-		}
-	}
-
-	static BufferOrEvent parseBuffer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
-		Buffer buffer = buildSingleBuffer(bufferConsumer);
+	static BufferOrEvent parseBuffer(Buffer buffer, int targetChannel) throws IOException {
 		if (buffer.isBuffer()) {
 			return new BufferOrEvent(buffer, new InputChannelInfo(0, targetChannel));
 		} else {
@@ -581,68 +472,6 @@ public class RecordWriterTest {
 		}
 	}
 
-	/**
-	 * Partition writer that recycles all received buffers and does no further processing.
-	 */
-	private static class RecyclingPartitionWriter extends MockResultPartitionWriter {
-		private final BufferProvider bufferProvider;
-
-		private RecyclingPartitionWriter(BufferProvider bufferProvider) {
-			this.bufferProvider = bufferProvider;
-		}
-
-		@Override
-		public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-			return bufferProvider.requestBufferBuilderBlocking(targetChannel);
-		}
-
-		@Override
-		public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-			return bufferProvider.requestBufferBuilder(targetChannel);
-		}
-	}
-
-	static class KeepingPartitionWriter extends MockResultPartitionWriter {
-		private final BufferProvider bufferProvider;
-		private Map<Integer, List<BufferConsumer>> produced = new HashMap<>();
-
-		KeepingPartitionWriter(BufferProvider bufferProvider) {
-			this.bufferProvider = bufferProvider;
-		}
-
-		@Override
-		public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-			return bufferProvider.requestBufferBuilderBlocking(targetChannel);
-		}
-
-		@Override
-		public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-			return bufferProvider.requestBufferBuilder(targetChannel);
-		}
-
-		@Override
-		public boolean addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) {
-			// keep the buffer occupied.
-			produced.putIfAbsent(targetChannel, new ArrayList<>());
-			produced.get(targetChannel).add(bufferConsumer);
-			return true;
-		}
-
-		public List<BufferConsumer> getAddedBufferConsumers(int subpartitionIndex) {
-			return produced.get(subpartitionIndex);
-		}
-
-		@Override
-		public void close() {
-			for (List<BufferConsumer> bufferConsumers : produced.values()) {
-				for (BufferConsumer bufferConsumer : bufferConsumers) {
-					bufferConsumer.close();
-				}
-			}
-			produced.clear();
-		}
-	}
-
 	private static class ByteArrayIO implements IOReadableWritable {
 
 		private final byte[] bytes;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
index 3b6a2f5..f9616bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java
@@ -21,22 +21,21 @@ package org.apache.flink.runtime.io.network.netty;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
 import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
 import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
-import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
 import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionTest;
 import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
@@ -55,9 +54,11 @@ import org.junit.rules.TemporaryFolder;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
+import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
@@ -478,14 +479,12 @@ public class PartitionRequestQueueTest {
 	}
 
 	private static ResultPartition createFinishedPartitionWithFilledData(ResultPartitionManager partitionManager) throws Exception {
-		final ResultPartition partition = new ResultPartitionBuilder()
-			.setResultPartitionType(ResultPartitionType.BLOCKING)
-			.setFileChannelManager(fileChannelManager)
-			.setResultPartitionManager(partitionManager)
-			.build();
-
-		partitionManager.registerResultPartition(partition);
-		PartitionTestUtils.writeBuffers(partition, 1, BUFFER_SIZE);
+		NettyShuffleEnvironment environment = new NettyShuffleEnvironmentBuilder().setResultPartitionManager(partitionManager).build();
+		ResultPartition partition = createPartition(environment, fileChannelManager, ResultPartitionType.BLOCKING, 1);
+
+		partition.setup();
+		partition.emitRecord(ByteBuffer.allocate(BUFFER_SIZE), 0);
+		partition.finish();
 
 		return partition;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
index ad21bf4..220225e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockResultPartitionWriter.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -54,14 +55,15 @@ public class MockResultPartitionWriter implements ResultPartitionWriter {
 	}
 
 	@Override
-	public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
-		bufferConsumer.close();
-		return true;
+	public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
 	}
 
 	@Override
-	public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
-		throw new UnsupportedOperationException();
+	public void broadcastRecord(ByteBuffer record) throws IOException {
+	}
+
+	@Override
+	public void broadcastEvent(AbstractEvent event, boolean isPriorityEvent) throws IOException {
 	}
 
 	@Override
@@ -69,8 +71,8 @@ public class MockResultPartitionWriter implements ResultPartitionWriter {
 		throw new UnsupportedOperationException();
 	}
 
-	public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
-		throw new UnsupportedOperationException();
+	@Override
+	public void setMetricGroup(TaskIOMetricGroup metrics) {
 	}
 
 	@Override
@@ -90,6 +92,20 @@ public class MockResultPartitionWriter implements ResultPartitionWriter {
 	}
 
 	@Override
+	public boolean isFinished() {
+		return false;
+	}
+
+	@Override
+	public void release(Throwable cause) {
+	}
+
+	@Override
+	public boolean isReleased() {
+		return false;
+	}
+
+	@Override
 	public CompletableFuture<?> getAvailableFuture() {
 		return AVAILABLE;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 462cc4b..1ee8dd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -39,6 +38,8 @@ import org.apache.flink.util.TestLogger;
 import org.junit.ClassRule;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
+
 /**
  * Test for consuming a pipelined result only partially.
  */
@@ -118,10 +119,8 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
 			final ResultPartitionWriter writer = getEnvironment().getWriter(0);
 
 			for (int i = 0; i < 8; i++) {
-				final BufferBuilder bufferBuilder = writer.getBufferBuilder(0);
-				writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
+				writer.emitRecord(ByteBuffer.allocate(1024), 0);
 				Thread.sleep(50);
-				bufferBuilder.finish();
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index a545b73..c6d7819 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
@@ -31,7 +30,6 @@ import org.hamcrest.Matchers;
 
 import java.io.IOException;
 
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 
@@ -131,14 +129,4 @@ public enum PartitionTestUtils {
 			1,
 			true);
 	}
-
-	public static void writeBuffers(
-			ResultPartitionWriter partition,
-			int numberOfBuffers,
-			int bufferSize) throws IOException {
-		for (int i = 0; i < numberOfBuffers; i++) {
-			partition.addBufferConsumer(createFilledFinishedBufferConsumer(bufferSize), 0);
-		}
-		partition.finish();
-	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 95aee00..c806a19 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -23,12 +23,9 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
 import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
@@ -39,7 +36,6 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -47,7 +43,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer;
-import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -141,41 +136,32 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 	private void testProduceConsume(boolean isSlowProducer, boolean isSlowConsumer) throws Exception {
 		// Config
-		final int producerBufferPoolSize = 8;
 		final int producerNumberOfBuffersToProduce = 128;
+		final int bufferSize = 32 * 1024;
 
 		// Producer behaviour
 		final TestProducerSource producerSource = new TestProducerSource() {
 
-			private BufferProvider bufferProvider = new TestPooledBufferProvider(producerBufferPoolSize);
-
 			private int numberOfBuffers;
 
 			@Override
-			public BufferConsumerAndChannel getNextBufferConsumer() throws Exception {
+			public BufferAndChannel getNextBuffer() throws Exception {
 				if (numberOfBuffers == producerNumberOfBuffersToProduce) {
 					return null;
 				}
 
-				final BufferBuilder bufferBuilder = bufferProvider.requestBufferBuilderBlocking();
-				final BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
-				int segmentSize = bufferBuilder.getMaxCapacity();
-
-				MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(segmentSize);
+				MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
 
-				int next = numberOfBuffers * (segmentSize / Integer.BYTES);
+				int next = numberOfBuffers * (bufferSize / Integer.BYTES);
 
-				for (int i = 0; i < segmentSize; i += 4) {
+				for (int i = 0; i < bufferSize; i += 4) {
 					segment.putInt(i, next);
 					next++;
 				}
 
-				checkState(bufferBuilder.appendAndCommit(ByteBuffer.wrap(segment.getArray())) == segmentSize);
-				bufferBuilder.finish();
-
 				numberOfBuffers++;
 
-				return new BufferConsumerAndChannel(bufferConsumer, 0);
+				return new BufferAndChannel(segment.getArray(), 0);
 			}
 		};
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index e293422..d221e99 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -26,49 +26,46 @@ import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.FileChannelManagerImpl;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator;
 import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.concurrent.FutureConsumerWithException;
 
+import org.hamcrest.Matchers;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledFinishedBufferConsumer;
 import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition;
 import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.verifyCreateSubpartitionViewThrowsException;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /**
  * Tests for {@link ResultPartition}.
@@ -79,6 +76,8 @@ public class ResultPartitionTest {
 
 	private static FileChannelManager fileChannelManager;
 
+	private final int bufferSize = 1024;
+
 	@BeforeClass
 	public static void setUp() {
 		fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing");
@@ -115,32 +114,45 @@ public class ResultPartitionTest {
 	 */
 	@Test
 	public void testSendScheduleOrUpdateConsumersMessage() throws Exception {
+		FutureConsumerWithException[] notificationCalls = new FutureConsumerWithException[] {
+			writer -> ((ResultPartitionWriter) writer).finish(),
+			writer -> ((ResultPartitionWriter) writer).emitRecord(ByteBuffer.allocate(bufferSize), 0),
+			writer -> ((ResultPartitionWriter) writer).broadcastEvent(EndOfPartitionEvent.INSTANCE, false),
+			writer -> ((ResultPartitionWriter) writer).broadcastRecord(ByteBuffer.allocate(bufferSize))
+		};
+
+		for (FutureConsumerWithException notificationCall: notificationCalls) {
+			testSendScheduleOrUpdateConsumersMessage(notificationCall);
+		}
+	}
+
+	private void testSendScheduleOrUpdateConsumersMessage(
+			FutureConsumerWithException<ResultPartitionWriter, Exception> notificationCall) throws Exception {
 		JobID jobId = new JobID();
 		TaskActions taskActions = new NoOpTaskActions();
 
 		{
 			// Pipelined, send message => notify
-			ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+			TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
 			ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
 				ResultPartitionType.PIPELINED,
 				taskActions,
 				jobId,
 				notifier);
-			consumableNotifyingPartitionWriter.addBufferConsumer(createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE), 0);
-			verify(notifier, times(1))
-				.notifyPartitionConsumable(eq(jobId), eq(consumableNotifyingPartitionWriter.getPartitionId()), eq(taskActions));
+			notificationCall.accept(consumableNotifyingPartitionWriter);
+			notifier.check(jobId, consumableNotifyingPartitionWriter.getPartitionId(), taskActions, 1);
 		}
 
 		{
 			// Blocking, send message => don't notify
-			ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+			TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
 			ResultPartitionWriter partition = createConsumableNotifyingResultPartitionWriter(
 				ResultPartitionType.BLOCKING,
 				taskActions,
 				jobId,
 				notifier);
-			partition.addBufferConsumer(createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE), 0);
-			verify(notifier, never()).notifyPartitionConsumable(eq(jobId), eq(partition.getPartitionId()), eq(taskActions));
+			notificationCall.accept(partition);
+			notifier.check(null, null, null, 0);
 		}
 	}
 
@@ -181,38 +193,32 @@ public class ResultPartitionTest {
 	}
 
 	/**
-	 * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already finished.
+	 * Tests {@link ResultPartition#emitRecord} on a partition which has already finished.
 	 *
 	 * @param partitionType the result partition type to set up
 	 */
 	private void testAddOnFinishedPartition(final ResultPartitionType partitionType) throws Exception {
-		BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
-		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
-		JobID jobId = new JobID();
-		TaskActions taskActions = new NoOpTaskActions();
-		ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
-			partitionType,
-			taskActions,
-			jobId,
-			notifier);
+		TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
+		BufferWritingResultPartition bufferWritingResultPartition = createResultPartition(partitionType);
+		ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
+			Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
+			new ResultPartitionWriter[]{bufferWritingResultPartition},
+			new NoOpTaskActions(),
+			new JobID(),
+			notifier)[0];
 		try {
-			consumableNotifyingPartitionWriter.finish();
-			reset(notifier);
-			// partition.add() should fail
-			consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
-			Assert.fail("exception expected");
+			partitionWriter.finish();
+			notifier.reset();
+			// partitionWriter.emitRecord() should fail
+			partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
 		} catch (IllegalStateException e) {
 			// expected => ignored
 		} finally {
-			if (!bufferConsumer.isRecycled()) {
-				bufferConsumer.close();
-				Assert.fail("bufferConsumer not recycled");
-			}
+			assertEquals(0, bufferWritingResultPartition.numBuffersOut.getCount());
+			assertEquals(0, bufferWritingResultPartition.numBytesOut.getCount());
+			assertEquals(0, bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
 			// should not have notified either
-			verify(notifier, never()).notifyPartitionConsumable(
-				eq(jobId),
-				eq(consumableNotifyingPartitionWriter.getPartitionId()),
-				eq(taskActions));
+			notifier.check(null, null, null, 0);
 		}
 	}
 
@@ -227,35 +233,30 @@ public class ResultPartitionTest {
 	}
 
 	/**
-	 * Tests {@link ResultPartition#addBufferConsumer} on a partition which has already been released.
+	 * Tests {@link ResultPartition#emitRecord} on a partition which has already been released.
 	 *
 	 * @param partitionType the result partition type to set up
 	 */
-	private void testAddOnReleasedPartition(final ResultPartitionType partitionType) throws Exception {
-		BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
-		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
-		JobID jobId = new JobID();
-		TaskActions taskActions = new NoOpTaskActions();
-		ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
-			createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
-		ResultPartitionWriter consumableNotifyingPartitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
+	private void testAddOnReleasedPartition(ResultPartitionType partitionType) throws Exception {
+		TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
+		BufferWritingResultPartition bufferWritingResultPartition = createResultPartition(partitionType);
+		ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
 			Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
-			new ResultPartitionWriter[] {partition},
-			taskActions,
-			jobId,
+			new ResultPartitionWriter[]{bufferWritingResultPartition},
+			new NoOpTaskActions(),
+			new JobID(),
 			notifier)[0];
 		try {
-			partition.release();
-			// partition.add() silently drops the bufferConsumer but recycles it
-			consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
-			assertTrue(partition.isReleased());
+			partitionWriter.release(null);
+			// partitionWriter.emitRecord() should silently drop the given record
+			partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
 		} finally {
-			if (!bufferConsumer.isRecycled()) {
-				bufferConsumer.close();
-				Assert.fail("bufferConsumer not recycled");
-			}
+			assertEquals(1, bufferWritingResultPartition.numBuffersOut.getCount());
+			assertEquals(bufferSize, bufferWritingResultPartition.numBytesOut.getCount());
+			// the buffer should be recycled for the result partition has already been released
+			assertEquals(0, bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
 			// should not have notified either
-			verify(notifier, never()).notifyPartitionConsumable(eq(jobId), eq(partition.getPartitionId()), eq(taskActions));
+			notifier.check(null, null, null, 0);
 		}
 	}
 
@@ -289,32 +290,31 @@ public class ResultPartitionTest {
 	}
 
 	/**
-	 * Tests {@link ResultPartition#addBufferConsumer(BufferConsumer, int)} on a working partition.
+	 * Tests {@link ResultPartition#emitRecord} on a working partition.
 	 *
 	 * @param partitionType the result partition type to set up
 	 */
 	private void testAddOnPartition(final ResultPartitionType partitionType) throws Exception {
-		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+		TestResultPartitionConsumableNotifier notifier = new TestResultPartitionConsumableNotifier();
 		JobID jobId = new JobID();
 		TaskActions taskActions = new NoOpTaskActions();
-		ResultPartitionWriter consumableNotifyingPartitionWriter = createConsumableNotifyingResultPartitionWriter(
-			partitionType,
+		BufferWritingResultPartition bufferWritingResultPartition = createResultPartition(partitionType);
+		ResultPartitionWriter partitionWriter = ConsumableNotifyingResultPartitionWriterDecorator.decorate(
+			Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
+			new ResultPartitionWriter[]{bufferWritingResultPartition},
 			taskActions,
 			jobId,
-			notifier);
-		BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(BufferBuilderTestUtils.BUFFER_SIZE);
+			notifier)[0];
 		try {
-			// partition.add() adds the bufferConsumer without recycling it (if not spilling)
-			consumableNotifyingPartitionWriter.addBufferConsumer(bufferConsumer, 0);
-			assertFalse("bufferConsumer should not be recycled (still in the queue)", bufferConsumer.isRecycled());
+			// partitionWriter.emitRecord() will allocate a new buffer and copies the record to it
+			partitionWriter.emitRecord(ByteBuffer.allocate(bufferSize), 0);
 		} finally {
-			if (!bufferConsumer.isRecycled()) {
-				bufferConsumer.close();
-			}
+			assertEquals(1, bufferWritingResultPartition.numBuffersOut.getCount());
+			assertEquals(bufferSize, bufferWritingResultPartition.numBytesOut.getCount());
+			assertEquals(1, bufferWritingResultPartition.getBufferPool().bestEffortGetNumOfUsedBuffers());
 			// should have been notified for pipelined partitions
 			if (partitionType.isPipelined()) {
-				verify(notifier, times(1))
-					.notifyPartitionConsumable(eq(jobId), eq(consumableNotifyingPartitionWriter.getPartitionId()), eq(taskActions));
+				notifier.check(jobId, partitionWriter.getPartitionId(), taskActions, 1);
 			}
 		}
 	}
@@ -332,15 +332,14 @@ public class ResultPartitionTest {
 	private void testReleaseMemory(final ResultPartitionType resultPartitionType) throws Exception {
 		final int numAllBuffers = 10;
 		final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
-			.setNumNetworkBuffers(numAllBuffers).build();
+			.setNumNetworkBuffers(numAllBuffers).setBufferSize(bufferSize).build();
 		final ResultPartition resultPartition = createPartition(network, resultPartitionType, 1);
 		try {
 			resultPartition.setup();
 
 			// take all buffers (more than the minimum required)
 			for (int i = 0; i < numAllBuffers; ++i) {
-				BufferBuilder bufferBuilder = resultPartition.getBufferPool().requestBufferBuilderBlocking();
-				resultPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
+				resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
 			}
 			resultPartition.finish();
 
@@ -366,10 +365,12 @@ public class ResultPartitionTest {
 	 * Tests {@link ResultPartition#getAvailableFuture()}.
 	 */
 	@Test
-	public void testIsAvailableOrNot() throws IOException, InterruptedException {
+	public void testIsAvailableOrNot() throws IOException {
 		final int numAllBuffers = 10;
+		final int bufferSize = 1024;
 		final NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
-				.setNumNetworkBuffers(numAllBuffers).build();
+				.setNumNetworkBuffers(numAllBuffers)
+				.setBufferSize(bufferSize).build();
 		final ResultPartition resultPartition = createPartition(network, ResultPartitionType.PIPELINED, 1);
 
 		try {
@@ -379,8 +380,8 @@ public class ResultPartitionTest {
 
 			assertTrue(resultPartition.getAvailableFuture().isDone());
 
-			resultPartition.getBufferBuilder(0);
-			resultPartition.getBufferBuilder(0);
+			resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+			resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
 			assertFalse(resultPartition.getAvailableFuture().isDone());
 		} finally {
 			resultPartition.release();
@@ -434,17 +435,25 @@ public class ResultPartitionTest {
 			ResultPartitionType partitionType,
 			TaskActions taskActions,
 			JobID jobId,
-			ResultPartitionConsumableNotifier notifier) {
-		ResultPartition partition = partitionType == ResultPartitionType.BLOCKING ?
-			createPartition(partitionType, fileChannelManager) : createPartition(partitionType);
+			ResultPartitionConsumableNotifier notifier) throws IOException {
+		ResultPartition partition = createResultPartition(partitionType);
 		return ConsumableNotifyingResultPartitionWriterDecorator.decorate(
 			Collections.singleton(PartitionTestUtils.createPartitionDeploymentDescriptor(partitionType)),
-			new ResultPartitionWriter[] {partition},
+			new ResultPartitionWriter[]{partition},
 			taskActions,
 			jobId,
 			notifier)[0];
 	}
 
+	private BufferWritingResultPartition createResultPartition(ResultPartitionType partitionType) throws IOException {
+		NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
+			.setNumNetworkBuffers(10)
+			.setBufferSize(bufferSize).build();
+		ResultPartition resultPartition = createPartition(network, fileChannelManager, partitionType, 1);
+		resultPartition.setup();
+		return (BufferWritingResultPartition) resultPartition;
+	}
+
 	@Test
 	public void testInitializeEmptyState() throws Exception {
 		final int totalBuffers = 2;
@@ -558,6 +567,50 @@ public class ResultPartitionTest {
 		}
 	}
 
+	@Test
+	public void testIdleTime() throws IOException, InterruptedException {
+		// setup
+		int bufferSize = 1024;
+		NetworkBufferPool globalPool = new NetworkBufferPool(10, bufferSize);
+		BufferPool localPool = globalPool.createBufferPool(1, 1, null, 1, Integer.MAX_VALUE);
+		BufferWritingResultPartition resultPartition = (BufferWritingResultPartition) new ResultPartitionBuilder()
+			.setBufferPoolFactory(p -> localPool)
+			.build();
+		resultPartition.setup();
+
+		resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+		ResultSubpartitionView readView = resultPartition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
+		Buffer buffer = readView.getNextBuffer().buffer();
+		assertNotNull(buffer);
+
+		// idle time is zero when there is buffer available.
+		assertEquals(0, resultPartition.getIdleTimeMsPerSecond().getCount());
+
+		CountDownLatch syncLock = new CountDownLatch(1);
+		final Thread requestThread = new Thread(() -> {
+			try {
+				// notify that the request thread start to run.
+				syncLock.countDown();
+				// wait for buffer.
+				resultPartition.emitRecord(ByteBuffer.allocate(bufferSize), 0);
+			} catch (Exception e) {
+			}
+		});
+		requestThread.start();
+
+		// wait until request thread start to run.
+		syncLock.await();
+
+		Thread.sleep(100);
+
+		//recycle the buffer
+		buffer.recycleBuffer();
+		requestThread.join();
+
+		Assert.assertThat(resultPartition.getIdleTimeMsPerSecond().getCount(), Matchers.greaterThan(0L));
+		assertNotNull(readView.getNextBuffer().buffer());
+	}
+
 	/**
 	 * The {@link ChannelStateReader} instance for restoring the specific number of states.
 	 */
@@ -631,4 +684,33 @@ public class ResultPartitionTest {
 		public void close() {
 		}
 	}
+
+	private static class TestResultPartitionConsumableNotifier implements ResultPartitionConsumableNotifier {
+		private JobID jobID;
+		private ResultPartitionID partitionID;
+		private TaskActions taskActions;
+		private int numNotification;
+
+		@Override
+		public void notifyPartitionConsumable(JobID jobID, ResultPartitionID partitionID, TaskActions taskActions) {
+			++numNotification;
+			this.jobID = jobID;
+			this.partitionID = partitionID;
+			this.taskActions = taskActions;
+		}
+
+		private void check(JobID jobID, ResultPartitionID partitionID, TaskActions taskActions, int numNotification) {
+			assertEquals(jobID, this.jobID);
+			assertEquals(partitionID, this.partitionID);
+			assertEquals(taskActions, this.taskActions);
+			assertEquals(numNotification, this.numNotification);
+		}
+
+		private void reset() {
+			jobID = null;
+			partitionID = null;
+			taskActions = null;
+			numNotification = 0;
+		}
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index 4a5f445..a4985cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -32,6 +32,7 @@ import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Optional;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
@@ -49,7 +50,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 
 	private MutableObjectIterator<T> inputIterator;
 
-	private RecordSerializer<T> serializer;
+	private DataOutputSerializer serializer;
 
 	private final T reuse;
 
@@ -68,7 +69,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 
 	private IteratorWrappingTestSingleInputGate<T> wrapIterator(MutableObjectIterator<T> iterator) throws IOException, InterruptedException {
 		inputIterator = iterator;
-		serializer = new SpanningRecordSerializer<T>();
+		serializer = new DataOutputSerializer(128);
 
 		// The input iterator can produce an infinite stream. That's why we have to serialize each
 		// record on demand and cannot do it upfront.
@@ -79,10 +80,10 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 			@Override
 			public Optional<BufferAndAvailability> getBufferAvailability() throws IOException {
 				if (hasData) {
-					serializer.serializeRecord(reuse);
+					ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, reuse);
 					BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
 					BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
-					serializer.copyToBufferBuilder(bufferBuilder);
+					bufferBuilder.appendAndCommit(serializedRecord);
 
 					hasData = inputIterator.next(reuse) != null;
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 6e65ad74..90cbc6f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -28,13 +28,12 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
@@ -60,7 +59,6 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -135,8 +133,8 @@ public class LocalInputChannelTest {
 				.setNumberOfSubpartitions(parallelism)
 				.setNumTargetKeyGroups(parallelism)
 				.setResultPartitionManager(partitionManager)
-				.setBufferPoolFactory(p ->
-					networkBuffers.createBufferPool(producerBufferPoolSize, producerBufferPoolSize))
+				.setBufferPoolFactory(p -> networkBuffers.createBufferPool(
+					producerBufferPoolSize, producerBufferPoolSize, null, parallelism, Integer.MAX_VALUE))
 				.build();
 
 			// Create a buffer pool for this partition
@@ -144,11 +142,11 @@ public class LocalInputChannelTest {
 
 			// Create the producer
 			partitionProducers[i] = new TestPartitionProducer(
-				partition,
+				(BufferWritingResultPartition) partition,
 				false,
 				new TestPartitionProducerBufferSource(
 					parallelism,
-					partition.getBufferPool(),
+					TestBufferFactory.BUFFER_SIZE,
 					numberOfBuffersPerChannel)
 			);
 		}
@@ -519,16 +517,16 @@ public class LocalInputChannelTest {
 	 */
 	private static class TestPartitionProducerBufferSource implements TestProducerSource {
 
-		private final BufferProvider bufferProvider;
+		private final int bufferSize;
 
 		private final List<Byte> channelIndexes;
 
 		public TestPartitionProducerBufferSource(
 				int parallelism,
-				BufferProvider bufferProvider,
+				int bufferSize,
 				int numberOfBuffersToProduce) {
 
-			this.bufferProvider = bufferProvider;
+			this.bufferSize = bufferSize;
 			this.channelIndexes = Lists.newArrayListWithCapacity(
 					parallelism * numberOfBuffersToProduce);
 
@@ -544,14 +542,10 @@ public class LocalInputChannelTest {
 		}
 
 		@Override
-		public BufferConsumerAndChannel getNextBufferConsumer() throws Exception {
+		public BufferAndChannel getNextBuffer() throws Exception {
 			if (channelIndexes.size() > 0) {
 				final int channelIndex = channelIndexes.remove(0);
-				BufferBuilder bufferBuilder = bufferProvider.requestBufferBuilderBlocking();
-				BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
-				bufferBuilder.appendAndCommit(ByteBuffer.wrap(new byte[4]));
-				bufferBuilder.finish();
-				return new BufferConsumerAndChannel(bufferConsumer, channelIndex);
+				return new BufferAndChannel(new byte[bufferSize], channelIndex);
 			}
 
 			return null;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 3787627..d16ee7f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.io.network.TaskEventPublisher;
 import org.apache.flink.runtime.io.network.TestingConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderAndConsumerTest;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -42,6 +41,7 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
+import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
 import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
 import org.apache.flink.runtime.io.network.partition.NoOpResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
@@ -753,7 +753,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 	public void testQueuedBuffers() throws Exception {
 		final NettyShuffleEnvironment network = createNettyShuffleEnvironment();
 
-		final ResultPartition resultPartition = new ResultPartitionBuilder()
+		final BufferWritingResultPartition resultPartition = (BufferWritingResultPartition) new ResultPartitionBuilder()
 			.setResultPartitionManager(network.getResultPartitionManager())
 			.setupBufferPoolFactoryFromNettyShuffleEnvironment(network)
 			.build();
@@ -784,7 +784,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 			remoteInputChannel.onBuffer(createBuffer(1), 0, 0);
 			assertEquals(1, inputGate.getNumberOfQueuedBuffers());
 
-			resultPartition.addBufferConsumer(BufferBuilderTestUtils.createFilledFinishedBufferConsumer(1), 0);
+			resultPartition.emitRecord(ByteBuffer.allocate(1), 0);
 			assertEquals(2, inputGate.getNumberOfQueuedBuffers());
 		} finally {
 			resultPartition.release();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
index 2dfb4c9..9d94402 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPartitionProducer.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.io.network.util;
 
-import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferConsumerAndChannel;
+import org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition;
+import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferAndChannel;
 
+import java.nio.ByteBuffer;
 import java.util.Random;
 import java.util.concurrent.Callable;
 
@@ -38,7 +39,7 @@ public class TestPartitionProducer implements Callable<Boolean> {
 	public static final int MAX_SLEEP_TIME_MS = 20;
 
 	/** The partition to add data to. */
-	private final ResultPartition partition;
+	private final BufferWritingResultPartition partition;
 
 	/**
 	 * Flag indicating whether the consumer is slow. If true, the consumer will sleep a random
@@ -53,7 +54,7 @@ public class TestPartitionProducer implements Callable<Boolean> {
 	private final Random random;
 
 	public TestPartitionProducer(
-			ResultPartition partition,
+			BufferWritingResultPartition partition,
 			boolean isSlowProducer,
 			TestProducerSource source) {
 
@@ -69,10 +70,11 @@ public class TestPartitionProducer implements Callable<Boolean> {
 		boolean success = false;
 
 		try {
-			BufferConsumerAndChannel consumerAndChannel;
+			BufferAndChannel bufferAndChannel;
 
-			while ((consumerAndChannel = source.getNextBufferConsumer()) != null) {
-				partition.addBufferConsumer(consumerAndChannel.getBufferConsumer(), consumerAndChannel.getTargetChannel());
+			while ((bufferAndChannel = source.getNextBuffer()) != null) {
+				ByteBuffer record = ByteBuffer.wrap(bufferAndChannel.getBuffer());
+				partition.emitRecord(record, bufferAndChannel.getTargetChannel());
 
 				// Check for interrupted flag after adding data to prevent resource leaks
 				if (Thread.interrupted()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
index f5d97f5..99006e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestProducerSource.java
@@ -29,19 +29,19 @@ public interface TestProducerSource {
 	 *
 	 * <p> The channel index specifies the subpartition add the data to.
 	 */
-	BufferConsumerAndChannel getNextBufferConsumer() throws Exception;
+	BufferAndChannel getNextBuffer() throws Exception;
 
-	class BufferConsumerAndChannel {
-		private final BufferConsumer bufferConsumer;
+	class BufferAndChannel {
+		private final byte[] buffer;
 		private final int targetChannel;
 
-		public BufferConsumerAndChannel(BufferConsumer bufferConsumer, int targetChannel) {
-			this.bufferConsumer = checkNotNull(bufferConsumer);
+		public BufferAndChannel(byte[] buffer, int targetChannel) {
+			this.buffer = checkNotNull(buffer);
 			this.targetChannel = targetChannel;
 		}
 
-		public BufferConsumer getBufferConsumer() {
-			return bufferConsumer;
+		public byte[] getBuffer() {
+			return buffer;
 		}
 
 		public int getTargetChannel() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
index 60eee34..a14cdba 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionProducer.java
@@ -18,8 +18,12 @@
 
 package org.apache.flink.runtime.io.network.util;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
-import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferConsumerAndChannel;
+import org.apache.flink.runtime.io.network.util.TestProducerSource.BufferAndChannel;
 
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -69,10 +73,11 @@ public class TestSubpartitionProducer implements Callable<Boolean> {
 		boolean success = false;
 
 		try {
-			BufferConsumerAndChannel consumerAndChannel;
+			BufferAndChannel bufferAndChannel;
 
-			while ((consumerAndChannel = source.getNextBufferConsumer()) != null) {
-				subpartition.add(consumerAndChannel.getBufferConsumer());
+			while ((bufferAndChannel = source.getNextBuffer()) != null) {
+				MemorySegment segment = MemorySegmentFactory.wrap(bufferAndChannel.getBuffer());
+				subpartition.add(new BufferConsumer(segment, MemorySegment::free, Buffer.DataType.DATA_BUFFER));
 
 				// Check for interrupted flag after adding data to prevent resource leaks
 				if (Thread.interrupted()) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 2865204..a28a0b5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -202,7 +202,7 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 	public void addOutput(final List<Record> outputList) {
 		try {
-			outputs.add(new RecordCollectingResultPartitionWriter(outputList, new TestPooledBufferProvider(Integer.MAX_VALUE)));
+			outputs.add(new RecordCollectingResultPartitionWriter(outputList));
 		}
 		catch (Throwable t) {
 			t.printStackTrace();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 082ddc5..a297460 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -21,11 +21,11 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.B
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 
+import java.nio.ByteBuffer;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -81,7 +82,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 
 		for (int i = 0; i < numInputChannels; i++) {
 			final int channelIndex = i;
-			final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<Object>>();
+			final DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128);
 			final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>) (SerializationDelegate<?>)
 				new SerializationDelegate<>(new StreamElementSerializer<T>(serializer));
 
@@ -107,10 +108,10 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 					Object inputElement = input.getStreamRecord();
 
 					delegate.setInstance(inputElement);
-					recordSerializer.serializeRecord(delegate);
+					ByteBuffer serializedRecord = RecordWriter.serializeRecord(dataOutputSerializer, delegate);
 					BufferBuilder bufferBuilder = createBufferBuilder(bufferSize);
 					BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
-					recordSerializer.copyToBufferBuilder(bufferBuilder);
+					bufferBuilder.appendAndCommit(serializedRecord);
 					bufferBuilder.finish();
 
 					// Call getCurrentBuffer to ensure size is set
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
index 4c68ec3..48f833d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInputTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
@@ -27,9 +28,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
@@ -53,6 +53,7 @@ import org.junit.After;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -218,14 +219,15 @@ public class StreamTaskNetworkInputTest {
 	}
 
 	private void serializeRecord(long value, BufferBuilder bufferBuilder) throws IOException {
-		RecordSerializer<SerializationDelegate<StreamElement>> serializer = new SpanningRecordSerializer<>();
+		DataOutputSerializer serializer = new DataOutputSerializer(128);
 		SerializationDelegate<StreamElement> serializationDelegate =
 			new SerializationDelegate<>(
 				new StreamElementSerializer<>(LongSerializer.INSTANCE));
 		serializationDelegate.setInstance(new StreamRecord<>(value));
-		serializer.serializeRecord(serializationDelegate);
+		ByteBuffer serializedRecord = RecordWriter.serializeRecord(serializer, serializationDelegate);
+		bufferBuilder.appendAndCommit(serializedRecord);
 
-		assertFalse(serializer.copyToBufferBuilder(bufferBuilder).isFullBuffer());
+		assertFalse(bufferBuilder.isFull());
 	}
 
 	private static void assertHasNextElement(StreamTaskNetworkInput input, DataOutput output) throws Exception {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index bf2b79c..447b9a0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
@@ -204,7 +203,6 @@ public class StreamMockEnvironment implements Environment {
 		try {
 			outputs.add(new RecordOrEventCollectingResultPartitionWriter<T>(
 				outputList,
-				new TestPooledBufferProvider(Integer.MAX_VALUE),
 				serializer));
 		}
 		catch (Throwable t) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index 00ef9a1..87142d2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -252,11 +251,10 @@ public class SubtaskCheckpointCoordinatorTest {
 			.setEnvironment(mockEnvironment)
 			.build();
 
-		TestPooledBufferProvider bufferProvider = new TestPooledBufferProvider(1, 4096);
 		ArrayList<Object> recordOrEvents = new ArrayList<>();
 		StreamElementSerializer<String> stringStreamElementSerializer = new StreamElementSerializer<>(StringSerializer.INSTANCE);
 		ResultPartitionWriter resultPartitionWriter = new RecordOrEventCollectingResultPartitionWriter<>(
-			recordOrEvents, bufferProvider, stringStreamElementSerializer);
+			recordOrEvents, stringStreamElementSerializer);
 		mockEnvironment.addOutputs(Collections.singletonList(resultPartitionWriter));
 
 		OneInputStreamTask<String, String> task = testHarness.getTask();