You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/08 13:04:04 UTC

[10/15] flink git commit: [FLINK-8178][network] Introduce not threadsafe write only BufferBuilder

[FLINK-8178][network] Introduce not threadsafe write only BufferBuilder

While Buffer class is used in multithreaded context it requires synchronisation.
Previously it was miss-leading and unclear, suggesting that RecordSerializer should
take into account synchronisation of the Buffer that's holding. With NotThreadSafe
BufferBuilder there is now clear separation between single-threaded writing/creating
a BufferBuilder and multithreaded Buffer handling/retaining/recycling.

This increases throughput of network stack by factor of 2, because previously
method getMemorySegment() was called twice per record and it is a synchronized
method on recycleLock, while RecordSerializer is sole owner of the Buffer at
this point, so synchronisation is not needed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6945c2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6945c2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6945c2e

Branch: refs/heads/master
Commit: c6945c2ef48d4c2cad3fc935435c1ab83e834969
Parents: 0888bb6
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Tue Nov 28 16:49:37 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Jan 8 11:46:00 2018 +0100

----------------------------------------------------------------------
 .../api/serialization/RecordSerializer.java     |  17 +--
 .../serialization/SpanningRecordSerializer.java |  57 ++++-----
 .../io/network/api/writer/RecordWriter.java     |  10 +-
 .../flink/runtime/io/network/buffer/Buffer.java |   7 +-
 .../io/network/buffer/BufferBuilder.java        |  82 +++++++++++++
 .../io/network/buffer/BufferProvider.java       |   8 ++
 .../io/network/buffer/LocalBufferPool.java      |  15 ++-
 .../SpanningRecordSerializationTest.java        |  11 +-
 .../SpanningRecordSerializerTest.java           |  26 ++---
 .../io/network/api/writer/RecordWriterTest.java |  37 ++++--
 .../io/network/buffer/BufferBuilderTest.java    | 115 +++++++++++++++++++
 .../network/buffer/BufferBuilderTestUtils.java  |  32 ++++++
 .../IteratorWrappingTestSingleInputGate.java    |   9 +-
 .../network/serialization/LargeRecordsTest.java |  23 ++--
 .../network/util/TestPooledBufferProvider.java  |   7 ++
 .../consumer/StreamTestSingleInputGate.java     |  10 +-
 16 files changed, 352 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 5fe56c4..6a07f31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -19,11 +19,12 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
-import java.io.IOException;
-
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+
+import java.io.IOException;
 
 /**
  * Interface for turning records into sequences of memory segments.
@@ -79,19 +80,19 @@ public interface RecordSerializer<T extends IOReadableWritable> {
 	 * Sets a (next) target buffer to use and continues writing remaining data
 	 * to it until it is full.
 	 *
-	 * @param buffer the new target buffer to use
+	 * @param bufferBuilder the new target buffer to use
 	 * @return how much information was written to the target buffer and
 	 *         whether this buffer is full
 	 * @throws IOException
 	 */
-	SerializationResult setNextBuffer(Buffer buffer) throws IOException;
+	SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) throws IOException;
 
 	/**
 	 * Retrieves the current target buffer and sets its size to the actual
 	 * number of written bytes.
 	 *
 	 * After calling this method, a new target buffer is required to continue
-	 * writing (see {@link #setNextBuffer(Buffer)}).
+	 * writing (see {@link #setNextBufferBuilder(BufferBuilder)}).
 	 *
 	 * @return the target buffer that was used
 	 */
@@ -102,7 +103,7 @@ public interface RecordSerializer<T extends IOReadableWritable> {
 	 *
 	 * <p><strong>NOTE:</strong> After calling this method, <strong>a new target
 	 * buffer is required to continue writing</strong> (see
-	 * {@link #setNextBuffer(Buffer)}).</p>
+	 * {@link #setNextBufferBuilder(BufferBuilder)}).</p>
 	 */
 	void clearCurrentBuffer();
 
@@ -112,7 +113,7 @@ public interface RecordSerializer<T extends IOReadableWritable> {
 	 *
 	 * <p><strong>NOTE:</strong> After calling this method, a <strong>new record
 	 * and a new target buffer is required to start writing again</strong>
-	 * (see {@link #setNextBuffer(Buffer)}). If you want to continue
+	 * (see {@link #setNextBufferBuilder(BufferBuilder)}). If you want to continue
 	 * with the current record, use {@link #clearCurrentBuffer()} instead.</p>
 	 */
 	void clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 7394f83..efdfaa1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -18,20 +18,23 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
 import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
 /**
  * Record serializer which serializes the complete record to an intermediate
  * data serialization buffer and copies this buffer to target buffers
- * one-by-one using {@link #setNextBuffer(Buffer)}.
+ * one-by-one using {@link #setNextBufferBuilder(BufferBuilder)}.
  *
  * @param <T>
  */
@@ -50,13 +53,8 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	private final ByteBuffer lengthBuffer;
 
 	/** Current target {@link Buffer} of the serializer */
-	private Buffer targetBuffer;
-
-	/** Position in current {@link MemorySegment} of target buffer */
-	private int position;
-
-	/** Limit of current {@link MemorySegment} of target buffer */
-	private int limit;
+	@Nullable
+	private BufferBuilder targetBuffer;
 
 	public SpanningRecordSerializer() {
 		serializationBuffer = new DataOutputSerializer(128);
@@ -64,7 +62,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 		lengthBuffer = ByteBuffer.allocate(4);
 		lengthBuffer.order(ByteOrder.BIG_ENDIAN);
 
-		// ensure initial state with hasRemaining false (for correct setNextBuffer logic)
+		// ensure initial state with hasRemaining false (for correct setNextBufferBuilder logic)
 		dataBuffer = serializationBuffer.wrapAsByteBuffer();
 		lengthBuffer.position(4);
 	}
@@ -105,10 +103,8 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	}
 
 	@Override
-	public SerializationResult setNextBuffer(Buffer buffer) throws IOException {
+	public SerializationResult setNextBufferBuilder(BufferBuilder buffer) throws IOException {
 		targetBuffer = buffer;
-		position = 0;
-		limit = buffer.getSize();
 
 		if (lengthBuffer.hasRemaining()) {
 			copyToTargetBufferFrom(lengthBuffer);
@@ -140,19 +136,12 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 		if (targetBuffer == null) {
 			return;
 		}
-
-		int needed = source.remaining();
-		int available = limit - position;
-		int toCopy = Math.min(needed, available);
-
-		targetBuffer.getMemorySegment().put(position, source, toCopy);
-
-		position += toCopy;
+		targetBuffer.append(source);
 	}
 
 	private SerializationResult getSerializationResult() {
 		if (!dataBuffer.hasRemaining() && !lengthBuffer.hasRemaining()) {
-			return (position < limit)
+			return !targetBuffer.isFull()
 					? SerializationResult.FULL_RECORD
 					: SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL;
 		}
@@ -165,25 +154,21 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 		if (targetBuffer == null) {
 			return null;
 		}
-
-		targetBuffer.setSize(position);
-		return targetBuffer;
+		Buffer result = targetBuffer.build();
+		targetBuffer = null;
+		return result;
 	}
 
 	@Override
 	public void clearCurrentBuffer() {
 		targetBuffer = null;
-		position = 0;
-		limit = 0;
 	}
 
 	@Override
 	public void clear() {
 		targetBuffer = null;
-		position = 0;
-		limit = 0;
 
-		// ensure clear state with hasRemaining false (for correct setNextBuffer logic)
+		// ensure clear state with hasRemaining false (for correct setNextBufferBuilder logic)
 		dataBuffer.position(dataBuffer.limit());
 		lengthBuffer.position(4);
 	}
@@ -191,7 +176,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	@Override
 	public boolean hasData() {
 		// either data in current target buffer or intermediate buffers
-		return position > 0 || (lengthBuffer.hasRemaining() || dataBuffer.hasRemaining());
+		return (targetBuffer != null && !targetBuffer.isEmpty()) || lengthBuffer.hasRemaining() || dataBuffer.hasRemaining();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 4729800..39dbacc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -21,12 +21,13 @@ package org.apache.flink.runtime.io.network.api.writer;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.util.XORShiftRandom;
 
 import java.io.IOException;
@@ -129,8 +130,9 @@ public class RecordWriter<T extends IOReadableWritable> {
 						break;
 					}
 				} else {
-					buffer = targetPartition.getBufferProvider().requestBufferBlocking();
-					result = serializer.setNextBuffer(buffer);
+					BufferBuilder bufferBuilder =
+						targetPartition.getBufferProvider().requestBufferBuilderBlocking();
+					result = serializer.setNextBufferBuilder(bufferBuilder);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index d7980d2..33516bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -54,11 +54,14 @@ public class Buffer {
 	}
 
 	public Buffer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer) {
+		this(memorySegment, recycler, isBuffer, memorySegment.size());
+	}
+
+	public Buffer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer, int size) {
 		this.memorySegment = checkNotNull(memorySegment);
 		this.recycler = checkNotNull(recycler);
 		this.isBuffer = isBuffer;
-
-		this.currentSize = memorySegment.size();
+		this.currentSize = size;
 	}
 
 	public boolean isBuffer() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
new file mode 100644
index 0000000..08e49b5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Not thread safe class for filling in the initial content of the {@link Buffer}. Once writing to the builder
+ * is complete, {@link Buffer} instance can be built and shared across multiple threads.
+ */
+@NotThreadSafe
+public class BufferBuilder {
+	private final MemorySegment memorySegment;
+
+	private final BufferRecycler recycler;
+
+	private int position = 0;
+
+	private boolean built = false;
+
+	public BufferBuilder(MemorySegment memorySegment, BufferRecycler recycler) {
+		this.memorySegment = checkNotNull(memorySegment);
+		this.recycler = checkNotNull(recycler);
+	}
+
+	/**
+	 * @return number of copied bytes
+	 */
+	public int append(ByteBuffer source) {
+		checkState(!built);
+
+		int needed = source.remaining();
+		int available = limit() - position;
+		int toCopy = Math.min(needed, available);
+
+		memorySegment.put(position, source, toCopy);
+		position += toCopy;
+		return toCopy;
+	}
+
+	public boolean isFull() {
+		checkState(position <= limit());
+		return position == limit();
+	}
+
+	public Buffer build() {
+		checkState(!built);
+		built = true;
+		return new Buffer(memorySegment, recycler, true, position);
+	}
+
+	public boolean isEmpty() {
+		return position == 0;
+	}
+
+	private int limit() {
+		return memorySegment.size();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
index 9782584..843a2f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
@@ -44,6 +44,14 @@ public interface BufferProvider {
 	Buffer requestBufferBlocking() throws IOException, InterruptedException;
 
 	/**
+	 * Returns a {@link BufferBuilder} instance from the buffer provider.
+	 *
+	 * <p>If there is no buffer available, the call will block until one becomes available again or the
+	 * buffer provider has been destroyed.
+	 */
+	BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException;
+
+	/**
 	 * Adds a buffer availability listener to the buffer provider.
 	 *
 	 * <p>The operation fails with return value <code>false</code>, when there is a buffer available or

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index a66373c..7403bd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -179,7 +179,8 @@ class LocalBufferPool implements BufferPool {
 	@Override
 	public Buffer requestBuffer() throws IOException {
 		try {
-			return requestBuffer(false);
+			BufferBuilder bufferBuilder = requestBufferBuilder(false);
+			return bufferBuilder != null ? bufferBuilder.build() : null;
 		}
 		catch (InterruptedException e) {
 			throw new IOException(e);
@@ -188,10 +189,16 @@ class LocalBufferPool implements BufferPool {
 
 	@Override
 	public Buffer requestBufferBlocking() throws IOException, InterruptedException {
-		return requestBuffer(true);
+		BufferBuilder bufferBuilder = requestBufferBuilder(true);
+		return bufferBuilder != null ? bufferBuilder.build() : null;
 	}
 
-	private Buffer requestBuffer(boolean isBlocking) throws InterruptedException, IOException {
+	@Override
+	public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
+		return requestBufferBuilder(true);
+	}
+
+	private BufferBuilder requestBufferBuilder(boolean isBlocking) throws InterruptedException, IOException {
 		synchronized (availableMemorySegments) {
 			returnExcessMemorySegments();
 
@@ -226,7 +233,7 @@ class LocalBufferPool implements BufferPool {
 				}
 			}
 
-			return new Buffer(availableMemorySegments.poll(), this);
+			return new BufferBuilder(availableMemorySegments.poll(), this);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index f988c55..ed0ce6c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -19,19 +19,16 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.testutils.serialization.types.Util;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayDeque;
 
-import static org.mockito.Mockito.mock;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 
 public class SpanningRecordSerializationTest {
 
@@ -129,13 +126,11 @@ public class SpanningRecordSerializationTest {
 	{
 		final int SERIALIZATION_OVERHEAD = 4; // length encoding
 
-		final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), mock(BufferRecycler.class));
-
 		final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<SerializationTestType>();
 
 		// -------------------------------------------------------------------------------------------------------------
 
-		serializer.setNextBuffer(buffer);
+		serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
 
 		int numBytes = 0;
 		int numRecords = 0;
@@ -164,7 +159,7 @@ public class SpanningRecordSerializationTest {
 					}
 				}
 
-				while (serializer.setNextBuffer(buffer).isFullBuffer()) {
+				while (serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)).isFullBuffer()) {
 					deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize);
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
index fe9a386..ed6677e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
@@ -21,19 +21,17 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
 import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
 import org.apache.flink.testutils.serialization.types.Util;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.Random;
 
-import static org.mockito.Mockito.mock;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 
 public class SpanningRecordSerializerTest {
 
@@ -42,7 +40,6 @@ public class SpanningRecordSerializerTest {
 		final int SEGMENT_SIZE = 16;
 
 		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
-		final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
 		final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT);
 
 		Assert.assertFalse(serializer.hasData());
@@ -51,13 +48,13 @@ public class SpanningRecordSerializerTest {
 			serializer.addRecord(randomIntRecord);
 			Assert.assertTrue(serializer.hasData());
 
-			serializer.setNextBuffer(buffer);
+			serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
 			Assert.assertTrue(serializer.hasData());
 
 			serializer.clear();
 			Assert.assertFalse(serializer.hasData());
 
-			serializer.setNextBuffer(buffer);
+			serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
 
 			serializer.addRecord(randomIntRecord);
 			Assert.assertTrue(serializer.hasData());
@@ -76,10 +73,11 @@ public class SpanningRecordSerializerTest {
 		final int SEGMENT_SIZE = 11;
 
 		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
-		final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
 
 		try {
-			Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.setNextBuffer(buffer));
+			Assert.assertEquals(
+				RecordSerializer.SerializationResult.FULL_RECORD,
+				serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)));
 		} catch (IOException e) {
 			e.printStackTrace();
 		}
@@ -122,7 +120,7 @@ public class SpanningRecordSerializerTest {
 			result = serializer.addRecord(emptyRecord);
 			Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
 
-			result = serializer.setNextBuffer(buffer);
+			result = serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
 			Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
 		}
 		catch (Exception e) {
@@ -202,11 +200,10 @@ public class SpanningRecordSerializerTest {
 		final int SERIALIZATION_OVERHEAD = 4; // length encoding
 
 		final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
-		final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), mock(BufferRecycler.class));
 
 		// -------------------------------------------------------------------------------------------------------------
 
-		serializer.setNextBuffer(buffer);
+		serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
 
 		int numBytes = 0;
 		for (SerializationTestType record : records) {
@@ -217,14 +214,15 @@ public class SpanningRecordSerializerTest {
 				Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
 			} else if (numBytes == segmentSize) {
 				Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result);
-				serializer.setNextBuffer(buffer);
+				serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
 				numBytes = 0;
 			} else {
 				Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
 
 				while (result.isFullBuffer()) {
 					numBytes -= segmentSize;
-					result = serializer.setNextBuffer(buffer);
+
+					result = serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 59b98a2..f0eaa94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -31,11 +31,12 @@ import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
@@ -54,6 +55,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -70,7 +73,6 @@ import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -97,16 +99,18 @@ public class RecordWriterTest {
 
 			final CountDownLatch sync = new CountDownLatch(2);
 
-			final Buffer buffer = spy(TestBufferFactory.createBuffer(4));
+			final TrackingBufferRecycler recycler = new TrackingBufferRecycler();
+
+			final MemorySegment memorySegment = MemorySegmentFactory.allocateUnpooledSegment(4);
 
 			// Return buffer for first request, but block for all following requests.
-			Answer<Buffer> request = new Answer<Buffer>() {
+			Answer<BufferBuilder> request = new Answer<BufferBuilder>() {
 				@Override
-				public Buffer answer(InvocationOnMock invocation) throws Throwable {
+				public BufferBuilder answer(InvocationOnMock invocation) throws Throwable {
 					sync.countDown();
 
 					if (sync.getCount() == 1) {
-						return buffer;
+						return new BufferBuilder(memorySegment, recycler);
 					}
 
 					final Object o = new Object();
@@ -119,7 +123,7 @@ public class RecordWriterTest {
 			};
 
 			BufferProvider bufferProvider = mock(BufferProvider.class);
-			when(bufferProvider.requestBufferBlocking()).thenAnswer(request);
+			when(bufferProvider.requestBufferBuilderBlocking()).thenAnswer(request);
 
 			ResultPartitionWriter partitionWriter = createResultPartitionWriter(bufferProvider);
 
@@ -156,13 +160,13 @@ public class RecordWriterTest {
 			recordWriter.clearBuffers();
 
 			// Verify that buffer have been requested, but only one has been written out.
-			verify(bufferProvider, times(2)).requestBufferBlocking();
+			verify(bufferProvider, times(2)).requestBufferBuilderBlocking();
 			verify(partitionWriter, times(1)).writeBuffer(any(Buffer.class), anyInt());
 
 			// Verify that the written out buffer has only been recycled once
 			// (by the partition writer).
-			assertTrue("Buffer not recycled.", buffer.isRecycled());
-			verify(buffer, times(1)).recycle();
+			assertEquals(1, recycler.getRecycledMemorySegments().size());
+			assertEquals(memorySegment, recycler.getRecycledMemorySegments().get(0));
 		}
 		finally {
 			if (executor != null) {
@@ -566,4 +570,17 @@ public class RecordWriterTest {
 			return nextChannel;
 		}
 	}
+
+	private static class TrackingBufferRecycler implements BufferRecycler {
+		private final ArrayList<MemorySegment> recycledMemorySegments = new ArrayList<>();
+
+		@Override
+		public synchronized void recycle(MemorySegment memorySegment) {
+			recycledMemorySegments.add(memorySegment);
+		}
+
+		public synchronized List<MemorySegment> getRecycledMemorySegments() {
+			return recycledMemorySegments;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
new file mode 100644
index 0000000..3805274
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.testutils.DiscardingRecycler;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link BufferBuilder}.
+ */
+public class BufferBuilderTest {
+	private static final int BUFFER_SIZE = 10 * Integer.BYTES;
+
+	@Test
+	public void append() {
+		BufferBuilder bufferBuilder = createBufferBuilder();
+		int[] intsToWrite = new int[] {0, 1, 2, 3, 42};
+		ByteBuffer bytesToWrite = toByteBuffer(intsToWrite);
+
+		assertEquals(bytesToWrite.limit(), bufferBuilder.append(bytesToWrite));
+
+		assertEquals(bytesToWrite.limit(), bytesToWrite.position());
+		assertFalse(bufferBuilder.isFull());
+		Buffer buffer = bufferBuilder.build();
+		assertBufferContent(buffer, intsToWrite);
+		assertEquals(5 * Integer.BYTES, buffer.getSize());
+		assertEquals(DiscardingRecycler.INSTANCE, buffer.getRecycler());
+	}
+
+	@Test
+	public void multipleAppends() {
+		BufferBuilder bufferBuilder = createBufferBuilder();
+
+		bufferBuilder.append(toByteBuffer(0, 1));
+		bufferBuilder.append(toByteBuffer(2));
+		bufferBuilder.append(toByteBuffer(3, 42));
+
+		Buffer buffer = bufferBuilder.build();
+		assertBufferContent(buffer, 0, 1, 2, 3, 42);
+		assertEquals(5 * Integer.BYTES, buffer.getSize());
+	}
+
+	@Test
+	public void appendOverSize() {
+		BufferBuilder bufferBuilder = createBufferBuilder();
+		ByteBuffer bytesToWrite = toByteBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 42);
+
+		assertEquals(BUFFER_SIZE, bufferBuilder.append(bytesToWrite));
+
+		assertTrue(bufferBuilder.isFull());
+		Buffer buffer = bufferBuilder.build();
+		assertBufferContent(buffer, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+		assertEquals(BUFFER_SIZE, buffer.getSize());
+
+		bufferBuilder = createBufferBuilder();
+		assertEquals(Integer.BYTES, bufferBuilder.append(bytesToWrite));
+
+		assertFalse(bufferBuilder.isFull());
+		buffer = bufferBuilder.build();
+		assertBufferContent(buffer, 42);
+		assertEquals(Integer.BYTES, buffer.getSize());
+	}
+
+	@Test
+	public void buildEmptyBuffer() {
+		Buffer buffer = createBufferBuilder().build();
+		assertEquals(0, buffer.getSize());
+		assertBufferContent(buffer);
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void buildingBufferTwice() {
+		BufferBuilder bufferBuilder = createBufferBuilder();
+		bufferBuilder.build();
+		bufferBuilder.build();
+	}
+
+	private static ByteBuffer toByteBuffer(int... data) {
+		ByteBuffer byteBuffer = ByteBuffer.allocate(data.length * Integer.BYTES);
+		byteBuffer.asIntBuffer().put(data);
+		return byteBuffer;
+	}
+
+	private static void assertBufferContent(Buffer actualBuffer, int... expected) {
+		assertEquals(toByteBuffer(expected), actualBuffer.getNioBuffer());
+	}
+
+	private static BufferBuilder createBufferBuilder() {
+		return new BufferBuilder(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), DiscardingRecycler.INSTANCE);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
new file mode 100644
index 0000000..1113664
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+/**
+ * Utility class for create not-pooled {@link BufferBuilder}.
+ */
+public class BufferBuilderTestUtils {
+	public static BufferBuilder createBufferBuilder(int size) {
+		return new BufferBuilder(
+			MemorySegmentFactory.allocateUnpooledSegment(size),
+			FreeingBufferRecycler.INSTANCE);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index fa44393..16285b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -19,22 +19,20 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
+
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
 
-import static org.mockito.Mockito.mock;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 import static org.mockito.Mockito.when;
 
 public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate {
@@ -71,8 +69,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 			@Override
 			public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable {
 				if (hasData) {
-					final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), mock(BufferRecycler.class));
-					serializer.setNextBuffer(buffer);
+					serializer.setNextBufferBuilder(createBufferBuilder(bufferSize));
 					serializer.addRecord(reuse);
 
 					hasData = inputIterator.next(reuse) != null;

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
index d596863..057b917 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
@@ -18,28 +18,27 @@
 
 package org.apache.flink.runtime.io.network.serialization;
 
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.testutils.serialization.types.IntType;
-import org.apache.flink.testutils.serialization.types.SerializationTestType;
+import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType;
+import org.apache.flink.testutils.serialization.types.IntType;
+import org.apache.flink.testutils.serialization.types.SerializationTestType;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
 
 public class LargeRecordsTest {
 
@@ -52,8 +51,6 @@ public class LargeRecordsTest {
 			final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
 			final RecordDeserializer<SerializationTestType> deserializer = new AdaptiveSpanningRecordDeserializer<SerializationTestType>();
 
-			final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
-
 			List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>((NUM_RECORDS + 1) / 2);
 			List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>((NUM_RECORDS + 1) / 2);
 			
@@ -73,7 +70,7 @@ public class LargeRecordsTest {
 
 			// -------------------------------------------------------------------------------------------------------------
 
-			serializer.setNextBuffer(buffer);
+			serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
 			
 			int numRecordsDeserialized = 0;
 			
@@ -98,7 +95,7 @@ public class LargeRecordsTest {
 					}
 
 					// move buffers as long as necessary (for long records)
-					while (serializer.setNextBuffer(buffer).isFullBuffer()) {
+					while (serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)).isFullBuffer()) {
 						deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), SEGMENT_SIZE);
 					}
 					
@@ -152,8 +149,6 @@ public class LargeRecordsTest {
 					new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>(
 							new String[] { System.getProperty("java.io.tmpdir") } );
 
-			final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
-
 			List<SerializationTestType> originalRecords = new ArrayList<>((NUM_RECORDS + 1) / 2);
 			List<SerializationTestType> deserializedRecords = new ArrayList<>((NUM_RECORDS + 1) / 2);
 			
@@ -173,7 +168,7 @@ public class LargeRecordsTest {
 
 			// -------------------------------------------------------------------------------------------------------------
 
-			serializer.setNextBuffer(buffer);
+			serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
 			
 			int numRecordsDeserialized = 0;
 			
@@ -198,7 +193,7 @@ public class LargeRecordsTest {
 					}
 
 					// move buffers as long as necessary (for long records)
-					while (serializer.setNextBuffer(buffer).isFullBuffer()) {
+					while (serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)).isFullBuffer()) {
 						deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), SEGMENT_SIZE);
 					}
 					

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index cc52549..eb80578 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.util;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
@@ -79,6 +80,12 @@ public class TestPooledBufferProvider implements BufferProvider {
 	}
 
 	@Override
+	public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
+		Buffer buffer = requestBufferBlocking();
+		return new BufferBuilder(buffer.getMemorySegment(), buffer.getRecycler());
+	}
+
+	@Override
 	public boolean addBufferListener(BufferListener listener) {
 		return bufferRecycler.registerListener(listener);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index e14430e..f19d59d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -21,14 +21,11 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
 import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -41,8 +38,8 @@ import org.mockito.stubbing.Answer;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 /**
@@ -104,11 +101,8 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 						return new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false);
 					} else if (input != null && input.isStreamRecord()) {
 						Object inputElement = input.getStreamRecord();
-						final Buffer buffer = new Buffer(
-							MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
-							mock(BufferRecycler.class));
 
-						recordSerializer.setNextBuffer(buffer);
+						recordSerializer.setNextBufferBuilder(createBufferBuilder(bufferSize));
 						delegate.setInstance(inputElement);
 						recordSerializer.addRecord(delegate);