You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/18 14:25:57 UTC

[6/9] flink git commit: [FLINK-7520][network] let our Buffer class extend from netty's buffer class

[FLINK-7520][network] let our Buffer class extend from netty's buffer class

For this, use a common (flink) Buffer interface and an implementation
(NetworkBuffer) that implements netty's buffer methods as well. In the future,
with this, we are able to avoid unnecessary buffer copies when handing buffers
over to netty while keeping our MemorySegment logic and configuration.

For the netty-specific part, the NetworkBuffer also requires a ByteBuf allocator
which is otherwise not needed in our use cases, so if the buffer is handed over
to netty, it requires a byte buffer allocator to be set.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/85bea23a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/85bea23a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/85bea23a

Branch: refs/heads/master
Commit: 85bea23ace3133e1b2d239c4ee87270a201b9b6a
Parents: 1a5a355
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jan 9 17:08:50 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Jan 18 15:24:16 2018 +0100

----------------------------------------------------------------------
 .../flink/core/memory/HybridMemorySegment.java  |    8 -
 .../apache/flink/core/memory/MemorySegment.java |   32 +
 .../iomanager/AsynchronousFileIOChannel.java    |   15 +-
 .../iomanager/SynchronousBufferFileReader.java  |   10 +-
 .../api/serialization/EventSerializer.java      |    7 +-
 .../flink/runtime/io/network/buffer/Buffer.java |  258 +-
 .../io/network/buffer/BufferBuilder.java        |    2 +-
 .../io/network/buffer/LocalBufferPool.java      |    3 +-
 .../io/network/buffer/NetworkBuffer.java        |  526 +++
 .../network/netty/CreditBasedClientHandler.java |   13 +-
 .../runtime/io/network/netty/NettyMessage.java  |   10 +-
 .../netty/PartitionRequestClientHandler.java    |   25 +-
 .../partition/SpilledSubpartitionView.java      |    5 +-
 .../partition/consumer/RemoteInputChannel.java  |   10 +-
 .../AsynchronousBufferFileWriterTest.java       |    3 +-
 .../BufferFileWriterFileSegmentReaderTest.java  |   41 +-
 .../iomanager/BufferFileWriterReaderTest.java   |   21 +-
 .../io/network/api/writer/RecordWriterTest.java |    3 +-
 .../io/network/buffer/AbstractByteBufTest.java  | 3077 ++++++++++++++++++
 .../io/network/buffer/BufferBuilderTest.java    |    2 +-
 .../runtime/io/network/buffer/BufferTest.java   |  384 ++-
 .../netty/CancelPartitionRequestTest.java       |    6 +-
 .../netty/NettyMessageSerializationTest.java    |    6 +-
 .../PartitionRequestClientHandlerTest.java      |   15 +-
 .../partition/PipelinedSubpartitionTest.java    |   14 +-
 .../network/partition/ResultPartitionTest.java  |   14 +-
 .../partition/SpillableSubpartitionTest.java    |   20 +-
 .../partition/SpilledSubpartitionViewTest.java  |    2 +-
 .../network/partition/SubpartitionTestBase.java |    9 +-
 .../consumer/RemoteInputChannelTest.java        |    4 +-
 .../partition/consumer/SingleInputGateTest.java |    4 +-
 .../io/network/util/TestBufferFactory.java      |   33 +-
 .../network/util/TestPooledBufferProvider.java  |    3 +-
 .../streaming/runtime/io/BufferSpiller.java     |    5 +-
 .../io/BarrierBufferAlignmentLimitTest.java     |    4 +-
 .../streaming/runtime/io/BarrierBufferTest.java |    4 +-
 .../runtime/io/BarrierTrackerTest.java          |    4 +-
 .../streaming/runtime/io/BufferSpillerTest.java |    3 +-
 pom.xml                                         |    2 +
 tools/maven/suppressions-runtime.xml            |    4 +
 40 files changed, 4307 insertions(+), 304 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
index 13a2644..115718e 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/HybridMemorySegment.java
@@ -110,14 +110,6 @@ public final class HybridMemorySegment extends MemorySegment {
 	//  MemorySegment operations
 	// -------------------------------------------------------------------------
 
-	public byte[] getArray() {
-		if (heapMemory != null) {
-			return heapMemory;
-		} else {
-			throw new IllegalStateException("Memory segment does not represent heap memory");
-		}
-	}
-
 	/**
 	 * Gets the buffer that owns the memory of this memory segment.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
index 44b5eca..d169545 100644
--- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegment.java
@@ -238,6 +238,38 @@ public abstract class MemorySegment {
 	}
 
 	/**
+	 * Returns the byte array of on-heap memory segments.
+	 *
+	 * @return underlying byte array
+	 *
+	 * @throws IllegalStateException
+	 * 		if the memory segment does not represent on-heap memory
+	 */
+	public byte[] getArray() {
+		if (heapMemory != null) {
+			return heapMemory;
+		} else {
+			throw new IllegalStateException("Memory segment does not represent heap memory");
+		}
+	}
+
+	/**
+	 * Returns the memory address of off-heap memory segments.
+	 *
+	 * @return absolute memory address outside the heap
+	 *
+	 * @throws IllegalStateException
+	 * 		if the memory segment does not represent off-heap memory
+	 */
+	public long getAddress() {
+		if (heapMemory == null) {
+			return address;
+		} else {
+			throw new IllegalStateException("Memory segment does not represent off heap memory");
+		}
+	}
+
+	/**
 	 * Wraps the chunk of the underlying memory located between <tt>offset</tt> and
 	 * <tt>length</tt> in a NIO ByteBuffer.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
index 61a934a..554ea36 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousFileIOChannel.java
@@ -28,6 +28,7 @@ import java.nio.channels.FileChannel;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -367,14 +368,16 @@ final class BufferWriteRequest implements WriteRequest {
 
 	@Override
 	public void write() throws IOException {
+		ByteBuffer nioBufferReadable = buffer.getNioBufferReadable();
+
 		final ByteBuffer header = ByteBuffer.allocateDirect(8);
 
 		header.putInt(buffer.isBuffer() ? 1 : 0);
-		header.putInt(buffer.getSize());
+		header.putInt(nioBufferReadable.remaining());
 		header.flip();
 
 		channel.fileChannel.write(header);
-		channel.fileChannel.write(buffer.getNioBuffer());
+		channel.fileChannel.write(nioBufferReadable);
 	}
 
 	@Override
@@ -411,14 +414,14 @@ final class BufferReadRequest implements ReadRequest {
 			final boolean isBuffer = header.getInt() == 1;
 			final int size = header.getInt();
 
-			if (size > buffer.getMemorySegment().size()) {
-				throw new IllegalStateException("Buffer is too small for data: " + buffer.getMemorySegment().size() + " bytes available, but " + size + " needed. This is most likely due to an serialized event, which is larger than the buffer size.");
+			if (size > buffer.getMaxCapacity()) {
+				throw new IllegalStateException("Buffer is too small for data: " + buffer.getMaxCapacity() + " bytes available, but " + size + " needed. This is most likely due to an serialized event, which is larger than the buffer size.");
 			}
+			checkArgument(buffer.getSize() == 0, "Buffer not empty");
 
+			fileChannel.read(buffer.getNioBuffer(0, size));
 			buffer.setSize(size);
 
-			fileChannel.read(buffer.getNioBuffer());
-
 			if (!isBuffer) {
 				buffer.tagAsEvent();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
index 27189cb..f334153 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * A synchronous {@link BufferFileReader} implementation.
  *
@@ -54,14 +56,14 @@ public class SynchronousBufferFileReader extends SynchronousFileIOChannel implem
 			final boolean isBuffer = header.getInt() == 1;
 			final int size = header.getInt();
 
-			if (size > buffer.getMemorySegment().size()) {
-				throw new IllegalStateException("Buffer is too small for data: " + buffer.getMemorySegment().size() + " bytes available, but " + size + " needed. This is most likely due to an serialized event, which is larger than the buffer size.");
+			if (size > buffer.getMaxCapacity()) {
+				throw new IllegalStateException("Buffer is too small for data: " + buffer.getMaxCapacity() + " bytes available, but " + size + " needed. This is most likely due to an serialized event, which is larger than the buffer size.");
 			}
+			checkArgument(buffer.getSize() == 0, "Buffer not empty");
 
+			fileChannel.read(buffer.getNioBuffer(0, size));
 			buffer.setSize(size);
 
-			fileChannel.read(buffer.getNioBuffer());
-
 			if (!isBuffer) {
 				buffer.tagAsEvent();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 742410e..f1f238f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.core.memory.DataInputDeserializer;
 import org.apache.flink.core.memory.DataOutputSerializer;
 import org.apache.flink.util.InstantiationUtil;
@@ -275,14 +276,14 @@ public class EventSerializer {
 
 		MemorySegment data = MemorySegmentFactory.wrap(serializedEvent.array());
 		
-		final Buffer buffer = new Buffer(data, FreeingBufferRecycler.INSTANCE, false);
+		final Buffer buffer = new NetworkBuffer(data, FreeingBufferRecycler.INSTANCE, false);
 		buffer.setSize(serializedEvent.remaining());
 
 		return buffer;
 	}
 
 	public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) throws IOException {
-		return fromSerializedEvent(buffer.getNioBuffer(), classLoader);
+		return fromSerializedEvent(buffer.getNioBufferReadable(), classLoader);
 	}
 
 	/**
@@ -298,6 +299,6 @@ public class EventSerializer {
 		final Class<?> eventClass,
 		final ClassLoader classLoader) throws IOException {
 		return !buffer.isBuffer() &&
-			isEvent(buffer.getNioBuffer(), eventClass, classLoader);
+			isEvent(buffer.getNioBufferReadable(), eventClass, classLoader);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index 33516bf..12acfdb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -22,141 +22,153 @@ import org.apache.flink.core.memory.MemorySegment;
 
 import java.nio.ByteBuffer;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
- * Wrapper for pooled {@link MemorySegment} instances.
+ * Wrapper for pooled {@link MemorySegment} instances with reference counting.
+ *
+ * <p>This is similar to Netty's <tt>ByteBuf</tt> with some extensions and restricted to the methods
+ * our use cases outside Netty handling use. In particular, we use two different indexes for read
+ * and write operations, i.e. the <tt>reader</tt> and <tt>writer</tt> index (size of written data),
+ * which specify three regions inside the memory segment:
+ * <pre>
+ *     +-------------------+----------------+----------------+
+ *     | discardable bytes | readable bytes | writable bytes |
+ *     +-------------------+----------------+----------------+
+ *     |                   |                |                |
+ *     0      <=      readerIndex  <=  writerIndex   <=  max capacity
+ * </pre>
+ *
+ * <p>Our non-Netty usages of this <tt>Buffer</tt> class either rely on the underlying {@link
+ * #getMemorySegment()} directly, or on {@link ByteBuffer} wrappers of this buffer which do not
+ * modify either index, so the indices need to be updated manually via {@link #setReaderIndex(int)}
+ * and {@link #setSize(int)}.
  */
-public class Buffer {
+public interface Buffer {
 
-	/** The backing {@link MemorySegment} instance */
-	private final MemorySegment memorySegment;
+	/**
+	 * Returns whether this buffer represents a buffer or an event.
+	 *
+	 * @return <tt>true</tt> if this is a real buffer, <tt>false</tt> if this is an event
+	 */
+	boolean isBuffer();
 
-	private final Object recycleLock = new Object();
+	/**
+	 * Tags this buffer to represent an event.
+	 */
+	void tagAsEvent();
 
-	/** The recycler for the backing {@link MemorySegment} */
-	private final BufferRecycler recycler;
+	/**
+	 * Returns the underlying memory segment.
+	 *
+	 * @return the memory segment backing this buffer
+	 */
+	MemorySegment getMemorySegment();
 
-	private boolean isBuffer;
+	/**
+	 * Gets the buffer's recycler.
+	 *
+	 * @return buffer recycler
+	 */
+	BufferRecycler getRecycler();
 
-	/** The current number of references to this buffer */
-	private int referenceCount = 1;
+	/**
+	 * Releases this buffer once, i.e. reduces the reference count and recycles the buffer if the
+	 * reference count reaches <tt>0</tt>.
+	 *
+	 * @see #retain()
+	 */
+	void recycle();
+
+	/**
+	 * Returns whether this buffer has been recycled or not.
+	 *
+	 * @return <tt>true</tt> if already recycled, <tt>false</tt> otherwise
+	 */
+	boolean isRecycled();
+
+	/**
+	 * Retains this buffer for further use, increasing the reference counter by <tt>1</tt>.
+	 *
+	 * @return <tt>this</tt> instance (for chained calls)
+	 *
+	 * @see #recycle()
+	 */
+	Buffer retain();
 
 	/**
-	 * The current size of the buffer in the range from 0 (inclusive) to the
-	 * size of the backing {@link MemorySegment} (inclusive).
+	 * Returns the maximum size of the buffer, i.e. the capacity of the underlying {@link MemorySegment}.
+	 *
+	 * @return size of the buffer
 	 */
-	private int currentSize;
-
-	public Buffer(MemorySegment memorySegment, BufferRecycler recycler) {
-		this(memorySegment, recycler, true);
-	}
-
-	public Buffer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer) {
-		this(memorySegment, recycler, isBuffer, memorySegment.size());
-	}
-
-	public Buffer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer, int size) {
-		this.memorySegment = checkNotNull(memorySegment);
-		this.recycler = checkNotNull(recycler);
-		this.isBuffer = isBuffer;
-		this.currentSize = size;
-	}
-
-	public boolean isBuffer() {
-		return isBuffer;
-	}
-
-	public void tagAsEvent() {
-		synchronized (recycleLock) {
-			ensureNotRecycled();
-		}
-
-		isBuffer = false;
-	}
-
-	public MemorySegment getMemorySegment() {
-		synchronized (recycleLock) {
-			ensureNotRecycled();
-
-			return memorySegment;
-		}
-	}
-
-	public ByteBuffer getNioBuffer() {
-		synchronized (recycleLock) {
-			ensureNotRecycled();
-			// the memory segment returns a distinct buffer every time
-			return memorySegment.wrap(0, currentSize);
-		}
-	}
-	
-	public BufferRecycler getRecycler(){
-		return recycler;
-	}
+	int getMaxCapacity();
 
 	/**
-	 * Returns the size of this buffer without synchronization.
+	 * Returns the <tt>reader index</tt> of this buffer.
+	 *
+	 * <p>This is where readable (unconsumed) bytes start in the backing memory segment.
+	 *
+	 * @return reader index (from 0 (inclusive) to the size of the backing {@link MemorySegment}
+	 * (inclusive))
+	 */
+	int getReaderIndex();
+
+	/**
+	 * Sets the <tt>reader index</tt> of this buffer.
+	 *
+	 * @throws IndexOutOfBoundsException
+	 * 		if the index is less than <tt>0</tt> or greater than {@link #getSize()}
+	 */
+	void setReaderIndex(int readerIndex) throws IndexOutOfBoundsException;
+
+	/**
+	 * Returns the size of the written data, i.e. the <tt>writer index</tt>, of this buffer in an
+	 * non-synchronized fashion.
+	 *
+	 * <p>This is where writable bytes start in the backing memory segment.
+	 *
+	 * @return writer index (from 0 (inclusive) to the size of the backing {@link MemorySegment}
+	 * (inclusive))
+	 */
+	int getSizeUnsafe();
+
+	/**
+	 * Returns the size of the written data, i.e. the <tt>writer index</tt>, of this buffer.
+	 *
+	 * <p>This is where writable bytes start in the backing memory segment.
+	 *
+	 * @return writer index (from 0 (inclusive) to the size of the backing {@link MemorySegment}
+	 * (inclusive))
+	 */
+	int getSize();
+
+	/**
+	 * Sets the size of the written data, i.e. the <tt>writer index</tt>, of this buffer.
+	 *
+	 * @throws IndexOutOfBoundsException
+	 * 		if the index is less than {@link #getReaderIndex()} or greater than {@link #getMaxCapacity()}
+	 */
+	void setSize(int writerIndex);
+
+	/**
+	 * Gets a new {@link ByteBuffer} instance wrapping this buffer's readable bytes, i.e. between
+	 * {@link #getReaderIndex()} and {@link #getSize()}.
+	 *
+	 * <p>Please note that neither index is updated by the returned buffer.
+	 *
+	 * @return byte buffer sharing the contents of the underlying memory segment
+	 */
+	ByteBuffer getNioBufferReadable();
+
+	/**
+	 * Gets a new {@link ByteBuffer} instance wrapping this buffer's bytes.
+	 *
+	 * <p>Please note that neither <tt>read</tt> nor <tt>write</tt> index are updated by the
+	 * returned buffer.
+	 *
+	 * @return byte buffer sharing the contents of the underlying memory segment
 	 *
-	 * @return size of this buffer
+	 * @throws IndexOutOfBoundsException
+	 * 		if the indexes are not without the buffer's bounds
+	 * @see #getNioBufferReadable()
 	 */
-	public int getSizeUnsafe() {
-		return currentSize;
-	}
-
-	public int getSize() {
-		synchronized (recycleLock) {
-			return currentSize;
-		}
-	}
-
-	public void setSize(int newSize) {
-		synchronized (recycleLock) {
-			ensureNotRecycled();
-
-			if (newSize < 0 || newSize > memorySegment.size()) {
-				throw new IllegalArgumentException("Size of buffer must be >= 0 and <= " +
-													memorySegment.size() + ", but was " + newSize + ".");
-			}
-
-			currentSize = newSize;
-		}
-	}
-
-	public void recycle() {
-		synchronized (recycleLock) {
-			if (--referenceCount == 0) {
-				recycler.recycle(memorySegment);
-			}
-		}
-	}
-
-	public Buffer retain() {
-		synchronized (recycleLock) {
-			ensureNotRecycled();
-
-			referenceCount++;
-
-			return this;
-		}
-	}
-
-	public boolean isRecycled() {
-		synchronized (recycleLock) {
-			return referenceCount == 0;
-		}
-	}
-
-	// Must be called from synchronized scope
-	private void ensureNotRecycled() {
-		checkState(referenceCount > 0, "Buffer has already been recycled.");
-	}
-
-	@Override
-	public String toString() {
-		synchronized (recycleLock) {
-			return String.format("Buffer %s [size: %d, reference count: %d]", hashCode(), currentSize, referenceCount);
-		}
-	}
+	ByteBuffer getNioBuffer(int index, int length) throws IndexOutOfBoundsException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 08e49b5..ff59f96 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -69,7 +69,7 @@ public class BufferBuilder {
 	public Buffer build() {
 		checkState(!built);
 		built = true;
-		return new Buffer(memorySegment, recycler, true, position);
+		return new NetworkBuffer(memorySegment, recycler, true, position);
 	}
 
 	public boolean isEmpty() {

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 7403bd3..a36d6be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -252,7 +253,7 @@ class LocalBufferPool implements BufferPool {
 				}
 				else {
 					try {
-						boolean needMoreBuffers = listener.notifyBufferAvailable(new Buffer(segment, this));
+						boolean needMoreBuffers = listener.notifyBufferAvailable(new NetworkBuffer(segment, this));
 						if (needMoreBuffers) {
 							registeredListeners.add(listener);
 						}

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
new file mode 100644
index 0000000..4486caa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Wrapper for pooled {@link MemorySegment} instances.
+ *
+ * <p><strong>NOTE:</strong> before using this buffer in the netty stack, a buffer allocator must be
+ * set via {@link #setAllocator(ByteBufAllocator)}!
+ */
+public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Buffer {
+
+	/** The backing {@link MemorySegment} instance. */
+	private final MemorySegment memorySegment;
+
+	/** The recycler for the backing {@link MemorySegment}. */
+	private final BufferRecycler recycler;
+
+	/** Whether this buffer represents a buffer or an event. */
+	private boolean isBuffer;
+
+	/** Allocator for further byte buffers (needed by netty). */
+	private ByteBufAllocator allocator;
+
+	/**
+	 * The current size of the buffer in the range from 0 (inclusive) to the
+	 * size of the backing {@link MemorySegment} (inclusive).
+	 */
+	private int currentSize;
+
+	/**
+	 * Creates a new buffer instance backed by the given <tt>memorySegment</tt> with <tt>0</tt> for
+	 * the <tt>readerIndex</tt> and <tt>writerIndex</tt>.
+	 *
+	 * @param memorySegment
+	 * 		backing memory segment (defines {@link #maxCapacity})
+	 * @param recycler
+	 * 		will be called to recycle this buffer once the reference count is <tt>0</tt>
+	 */
+	public NetworkBuffer(MemorySegment memorySegment, BufferRecycler recycler) {
+		this(memorySegment, recycler, true);
+	}
+
+	/**
+	 * Creates a new buffer instance backed by the given <tt>memorySegment</tt> with <tt>0</tt> for
+	 * the <tt>readerIndex</tt> and <tt>writerIndex</tt>.
+	 *
+	 * @param memorySegment
+	 * 		backing memory segment (defines {@link #maxCapacity})
+	 * @param recycler
+	 * 		will be called to recycle this buffer once the reference count is <tt>0</tt>
+	 * @param isBuffer
+	 * 		whether this buffer represents a buffer (<tt>true</tt>) or an event (<tt>false</tt>)
+	 */
+	public NetworkBuffer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer) {
+		this(memorySegment, recycler, isBuffer, 0);
+	}
+
+	/**
+	 * Creates a new buffer instance backed by the given <tt>memorySegment</tt> with <tt>0</tt> for
+	 * the <tt>readerIndex</tt> and <tt>writerIndex</tt>.
+	 *
+	 * @param memorySegment
+	 * 		backing memory segment (defines {@link #maxCapacity})
+	 * @param recycler
+	 * 		will be called to recycle this buffer once the reference count is <tt>0</tt>
+	 * @param isBuffer
+	 * 		whether this buffer represents a buffer (<tt>true</tt>) or an event (<tt>false</tt>)
+	 * @param size
+	 * 		current size of data in the buffer, i.e. the writer index to set
+	 */
+	public NetworkBuffer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer, int size) {
+		super(memorySegment.size());
+		this.memorySegment = checkNotNull(memorySegment);
+		this.recycler = checkNotNull(recycler);
+		this.isBuffer = isBuffer;
+		this.currentSize = memorySegment.size();
+		setSize(size);
+	}
+
+	@Override
+	public boolean isBuffer() {
+		return isBuffer;
+	}
+
+	@Override
+	public void tagAsEvent() {
+		ensureAccessible();
+
+		isBuffer = false;
+	}
+
+	@Override
+	public MemorySegment getMemorySegment() {
+		ensureAccessible();
+
+		return memorySegment;
+	}
+
+	@Override
+	public BufferRecycler getRecycler(){
+		return recycler;
+	}
+
+	@Override
+	public void recycle() {
+		release();
+	}
+
+	@Override
+	public boolean isRecycled() {
+		return refCnt() == 0;
+	}
+
+	@Override
+	public NetworkBuffer retain() {
+		return (NetworkBuffer) super.retain();
+	}
+
+	@Override
+	protected void deallocate() {
+		recycler.recycle(memorySegment);
+	}
+
+	@Override
+	protected byte _getByte(int index) {
+		return memorySegment.get(index);
+	}
+
+	@Override
+	protected short _getShort(int index) {
+		return memorySegment.getShortBigEndian(index);
+	}
+
+	@Override
+	protected int _getUnsignedMedium(int index) {
+		// from UnpooledDirectByteBuf:
+		return (getByte(index) & 0xff) << 16 | (getByte(index + 1) & 0xff) << 8 | getByte(index + 2) & 0xff;
+	}
+
+	@Override
+	protected int _getInt(int index) {
+		return memorySegment.getIntBigEndian(index);
+	}
+
+	@Override
+	protected long _getLong(int index) {
+		return memorySegment.getLongBigEndian(index);
+	}
+
+	@Override
+	protected void _setByte(int index, int value) {
+		memorySegment.put(index, (byte) value);
+	}
+
+	@Override
+	protected void _setShort(int index, int value) {
+		memorySegment.putShortBigEndian(index, (short) value);
+	}
+
+	@Override
+	protected void _setMedium(int index, int value) {
+		// from UnpooledDirectByteBuf:
+		setByte(index, (byte) (value >>> 16));
+		setByte(index + 1, (byte) (value >>> 8));
+		setByte(index + 2, (byte) value);
+	}
+
+	@Override
+	protected void _setInt(int index, int value) {
+		memorySegment.putIntBigEndian(index, value);
+	}
+
+	@Override
+	protected void _setLong(int index, long value) {
+		memorySegment.putLongBigEndian(index, value);
+	}
+
+	@Override
+	public int capacity() {
+		return currentSize;
+	}
+
+	@Override
+	public int getMaxCapacity() {
+		return maxCapacity();
+	}
+
+	@Override
+	public int getReaderIndex() {
+		return readerIndex();
+	}
+
+	@Override
+	public void setReaderIndex(int readerIndex) throws IndexOutOfBoundsException {
+		readerIndex(readerIndex);
+	}
+
+	@Override
+	public int getSizeUnsafe() {
+		return writerIndex();
+	}
+
+	@Override
+	public int getSize() {
+		return writerIndex();
+	}
+
+	@Override
+	public void setSize(int writerIndex) {
+		writerIndex(writerIndex);
+	}
+
+	@Override
+	public ByteBuf capacity(int newCapacity) {
+		ensureAccessible();
+
+		if (newCapacity < 0 || newCapacity > maxCapacity()) {
+			throw new IllegalArgumentException("Size of buffer must be >= 0 and <= " +
+				memorySegment.size() + ", but was " + newCapacity + ".");
+		}
+
+		this.currentSize = newCapacity;
+		return this;
+	}
+
+	@Override
+	public ByteOrder order() {
+		return ByteOrder.BIG_ENDIAN;
+	}
+
+	@Override
+	public ByteBuf unwrap() {
+		// not a wrapper of another buffer
+		return null;
+	}
+
+	@Override
+	public boolean isDirect() {
+		return memorySegment.isOffHeap();
+	}
+
+	@Override
+	public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+		// from UnpooledDirectByteBuf:
+		checkDstIndex(index, length, dstIndex, dst.capacity());
+
+		if (dst.hasArray()) {
+			getBytes(index, dst.array(), dst.arrayOffset() + dstIndex, length);
+		} else if (dst.nioBufferCount() > 0) {
+			for (ByteBuffer bb: dst.nioBuffers(dstIndex, length)) {
+				int bbLen = bb.remaining();
+				getBytes(index, bb);
+				index += bbLen;
+			}
+		} else {
+			dst.setBytes(dstIndex, this, index, length);
+		}
+		return this;
+	}
+
+	@Override
+	public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+		checkDstIndex(index, length, dstIndex, dst.length);
+
+		memorySegment.get(index, dst, dstIndex, length);
+		return this;
+	}
+
+	@Override
+	public ByteBuf getBytes(int index, ByteBuffer dst) {
+		checkIndex(index, dst.remaining());
+
+		memorySegment.get(index, dst, dst.remaining());
+		return this;
+	}
+
+	@Override
+	public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+		// adapted from UnpooledDirectByteBuf:
+		checkIndex(index, length);
+		if (length == 0) {
+			return this;
+		}
+
+		if (memorySegment.isOffHeap()) {
+			byte[] tmp = new byte[length];
+			ByteBuffer tmpBuf = memorySegment.wrap(index, length);
+			tmpBuf.get(tmp);
+			out.write(tmp);
+		} else {
+			out.write(memorySegment.getArray(), index, length);
+		}
+
+		return this;
+	}
+
+	@Override
+	public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+		// adapted from UnpooledDirectByteBuf:
+		checkIndex(index, length);
+		if (length == 0) {
+			return 0;
+		}
+
+		ByteBuffer tmpBuf = memorySegment.wrap(index, length);
+		return out.write(tmpBuf);
+	}
+
+	@Override
+	public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+		// from UnpooledDirectByteBuf:
+		checkSrcIndex(index, length, srcIndex, src.capacity());
+		if (src.nioBufferCount() > 0) {
+			for (ByteBuffer bb: src.nioBuffers(srcIndex, length)) {
+				int bbLen = bb.remaining();
+				setBytes(index, bb);
+				index += bbLen;
+			}
+		} else {
+			src.getBytes(srcIndex, this, index, length);
+		}
+		return this;
+	}
+
+	@Override
+	public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+		// adapted from UnpooledDirectByteBuf:
+		checkSrcIndex(index, length, srcIndex, src.length);
+
+		ByteBuffer tmpBuf = memorySegment.wrap(index, length);
+		tmpBuf.put(src, srcIndex, length);
+		return this;
+	}
+
+	@Override
+	public ByteBuf setBytes(int index, ByteBuffer src) {
+		// adapted from UnpooledDirectByteBuf:
+		checkIndex(index, src.remaining());
+
+		ByteBuffer tmpBuf = memorySegment.wrap(index, src.remaining());
+		tmpBuf.put(src);
+		return this;
+	}
+
+	@Override
+	public int setBytes(int index, InputStream in, int length) throws IOException {
+		// adapted from UnpooledDirectByteBuf:
+		checkIndex(index, length);
+
+		if (memorySegment.isOffHeap()) {
+			byte[] tmp = new byte[length];
+			int readBytes = in.read(tmp);
+			if (readBytes <= 0) {
+				return readBytes;
+			}
+			ByteBuffer tmpBuf = memorySegment.wrap(index, length);
+			tmpBuf.put(tmp, 0, readBytes);
+			return readBytes;
+		} else {
+			return in.read(memorySegment.getArray(), index, length);
+		}
+	}
+
+	@Override
+	public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
+		// adapted from UnpooledDirectByteBuf:
+		checkIndex(index, length);
+
+		ByteBuffer tmpBuf = memorySegment.wrap(index, length);
+		try {
+			return in.read(tmpBuf);
+		} catch (ClosedChannelException ignored) {
+			return -1;
+		}
+	}
+
+	@Override
+	public ByteBufAllocator alloc() {
+		return checkNotNull(allocator);
+	}
+
+	/**
+	 * Sets the buffer allocator for use in netty.
+	 *
+	 * @param allocator netty buffer allocator
+	 */
+	public void setAllocator(ByteBufAllocator allocator) {
+		this.allocator = allocator;
+	}
+
+	@Override
+	public ByteBuf copy(int index, int length) {
+		checkIndex(index, length);
+
+		ByteBuf copy = alloc().buffer(length, maxCapacity());
+		copy.writeBytes(this, index, length);
+		return copy;
+	}
+
+	@Override
+	public ByteBuf readBytes(int length) {
+		// copied from the one in netty 4.0.50 fixing the wrong allocator being used
+		checkReadableBytes(length);
+		if (length == 0) {
+			return Unpooled.EMPTY_BUFFER;
+		}
+
+		ByteBuf buf = alloc().buffer(length, maxCapacity());
+		int readerIndex = readerIndex();
+		buf.writeBytes(this, readerIndex, length);
+		readerIndex(readerIndex + length);
+		return buf;
+	}
+
+	@Override
+	public int nioBufferCount() {
+		return 1;
+	}
+
+	@Override
+	public ByteBuffer getNioBufferReadable() {
+		return nioBuffer();
+	}
+
+	@Override
+	public ByteBuffer getNioBuffer(int index, int length) {
+		return nioBuffer(index, length);
+	}
+
+	@Override
+	public ByteBuffer nioBuffer(int index, int length) {
+		checkIndex(index, length);
+		return memorySegment.wrap(index, length).slice();
+	}
+
+	@Override
+	public ByteBuffer internalNioBuffer(int index, int length) {
+		return nioBuffer(index, length);
+	}
+
+	@Override
+	public ByteBuffer[] nioBuffers(int index, int length) {
+		return new ByteBuffer[] { nioBuffer(index, length) };
+	}
+
+	@Override
+	public boolean hasArray() {
+		return !memorySegment.isOffHeap();
+	}
+
+	@Override
+	public byte[] array() {
+		ensureAccessible();
+
+		return memorySegment.getArray();
+	}
+
+	@Override
+	public int arrayOffset() {
+		return 0;
+	}
+
+	@Override
+	public boolean hasMemoryAddress() {
+		return memorySegment.isOffHeap();
+	}
+
+	@Override
+	public long memoryAddress() {
+		return memorySegment.getAddress();
+	}
+
+	@Override
+	public String toString() {
+		if (refCnt() == 0) {
+			return String.format("Buffer %s (freed)", hashCode());
+		}
+
+		StringBuilder buf = new StringBuilder()
+			.append("Buffer ").append(hashCode())
+			.append(" (ridx: ").append(readerIndex())
+			.append(", widx: ").append(writerIndex())
+			.append(", cap: ").append(capacity());
+		if (maxCapacity() != Integer.MAX_VALUE) {
+			buf.append('/').append(maxCapacity());
+		}
+		buf.append(", ref count: ").append(refCnt())
+			.append(')');
+		return buf.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
index f5279bf..dbddaa7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
 import org.apache.flink.runtime.io.network.netty.exception.TransportException;
@@ -291,20 +292,20 @@ class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
 
 	private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.BufferResponse bufferOrEvent) throws Throwable {
 		try {
+			int size = bufferOrEvent.getSize();
 			if (bufferOrEvent.isBuffer()) {
 				// ---- Buffer ------------------------------------------------
 
 				// Early return for empty buffers. Otherwise Netty's readBytes() throws an
 				// IndexOutOfBoundsException.
-				if (bufferOrEvent.getSize() == 0) {
+				if (size == 0) {
 					inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
 					return;
 				}
 
-				Buffer buffer = inputChannel.requestBuffer();
+				NetworkBuffer buffer = (NetworkBuffer) inputChannel.requestBuffer();
 				if (buffer != null) {
-					buffer.setSize(bufferOrEvent.getSize());
-					bufferOrEvent.getNettyBuffer().readBytes(buffer.getNioBuffer());
+					bufferOrEvent.getNettyBuffer().readBytes(buffer, size);
 
 					inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
 				} else if (inputChannel.isReleased()) {
@@ -315,11 +316,11 @@ class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
 			} else {
 				// ---- Event -------------------------------------------------
 				// TODO We can just keep the serialized data in the Netty buffer and release it later at the reader
-				byte[] byteArray = new byte[bufferOrEvent.getSize()];
+				byte[] byteArray = new byte[size];
 				bufferOrEvent.getNettyBuffer().readBytes(byteArray);
 
 				MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
-				Buffer buffer = new Buffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
+				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
 
 				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index cffad83..db3a925 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -255,7 +255,7 @@ public abstract class NettyMessage {
 			this.buffer = checkNotNull(buffer);
 			this.retainedSlice = null;
 			this.isBuffer = buffer.isBuffer();
-			this.size = buffer.getSize();
+			this.size = buffer.getMaxCapacity();
 			this.sequenceNumber = sequenceNumber;
 			this.receiverId = checkNotNull(receiverId);
 			this.backlog = backlog;
@@ -288,18 +288,20 @@ public abstract class NettyMessage {
 		ByteBuf write(ByteBufAllocator allocator) throws IOException {
 			checkNotNull(buffer, "No buffer instance to serialize.");
 
-			int length = 16 + 4 + 4 + 1 + 4 + buffer.getSize();
 
 			ByteBuf result = null;
 			try {
+				ByteBuffer nioBufferReadable = buffer.getNioBufferReadable();
+				int length = 16 + 4 + 4 + 1 + 4 + nioBufferReadable.remaining();
+
 				result = allocateBuffer(allocator, ID, length);
 
 				receiverId.writeTo(result);
 				result.writeInt(sequenceNumber);
 				result.writeInt(backlog);
 				result.writeBoolean(buffer.isBuffer());
-				result.writeInt(buffer.getSize());
-				result.writeBytes(buffer.getNioBuffer());
+				result.writeInt(nioBufferReadable.remaining());
+				result.writeBytes(nioBufferReadable);
 
 				return result;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index e50c059..c7b20de 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
 import org.apache.flink.runtime.io.network.netty.exception.TransportException;
@@ -270,12 +271,14 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 		boolean releaseNettyBuffer = true;
 
 		try {
+			final int receivedSize = bufferOrEvent.getSize();
+
 			if (bufferOrEvent.isBuffer()) {
 				// ---- Buffer ------------------------------------------------
 
 				// Early return for empty buffers. Otherwise Netty's readBytes() throws an
 				// IndexOutOfBoundsException.
-				if (bufferOrEvent.getSize() == 0) {
+				if (receivedSize == 0) {
 					inputChannel.onEmptyBuffer(bufferOrEvent.sequenceNumber, -1);
 					return true;
 				}
@@ -289,11 +292,10 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 				}
 
 				while (true) {
-					Buffer buffer = bufferProvider.requestBuffer();
+					NetworkBuffer buffer = (NetworkBuffer) bufferProvider.requestBuffer();
 
 					if (buffer != null) {
-						buffer.setSize(bufferOrEvent.getSize());
-						bufferOrEvent.getNettyBuffer().readBytes(buffer.getNioBuffer());
+						bufferOrEvent.getNettyBuffer().readBytes(buffer, receivedSize);
 
 						inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, -1);
 
@@ -312,11 +314,12 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 			else {
 				// ---- Event -------------------------------------------------
 				// TODO We can just keep the serialized data in the Netty buffer and release it later at the reader
-				byte[] byteArray = new byte[bufferOrEvent.getSize()];
+				byte[] byteArray = new byte[receivedSize];
 				bufferOrEvent.getNettyBuffer().readBytes(byteArray);
 
 				MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
-				Buffer buffer = new Buffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
+				Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
+				buffer.setSize(receivedSize);
 
 				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, -1);
 
@@ -364,7 +367,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 	 */
 	private class BufferListenerTask implements BufferListener, Runnable {
 
-		private final AtomicReference<Buffer> availableBuffer = new AtomicReference<Buffer>();
+		private final AtomicReference<NetworkBuffer> availableBuffer = new AtomicReference<>();
 
 		private NettyMessage.BufferResponse stagedBufferResponse;
 
@@ -409,7 +412,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 			boolean success = false;
 
 			try {
-				if (availableBuffer.compareAndSet(null, buffer)) {
+				if (availableBuffer.compareAndSet(null, (NetworkBuffer) buffer)) {
 					ctx.channel().eventLoop().execute(this);
 
 					success = true;
@@ -442,16 +445,14 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 		public void run() {
 			boolean success = false;
 
-			Buffer buffer = null;
+			NetworkBuffer buffer = null;
 
 			try {
 				if ((buffer = availableBuffer.getAndSet(null)) == null) {
 					throw new IllegalStateException("Running buffer availability task w/o a buffer.");
 				}
 
-				buffer.setSize(stagedBufferResponse.getSize());
-
-				stagedBufferResponse.getNettyBuffer().readBytes(buffer.getNioBuffer());
+				stagedBufferResponse.getNettyBuffer().readBytes(buffer, stagedBufferResponse.getSize());
 				stagedBufferResponse.releaseBuffer();
 
 				RemoteInputChannel inputChannel = inputChannels.get(stagedBufferResponse.receiverId);

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
index 20e0406..f0752c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.util.event.NotificationListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -193,7 +194,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis
 
 			synchronized (buffers) {
 				for (int i = 0; i < numberOfBuffers; i++) {
-					buffers.add(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this));
+					buffers.add(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this));
 				}
 			}
 		}
@@ -204,7 +205,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis
 				if (isDestroyed) {
 					memorySegment.free();
 				} else {
-					buffers.add(new Buffer(memorySegment, this));
+					buffers.add(new NetworkBuffer(memorySegment, this));
 					buffers.notifyAll();
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 397f407..9b0226a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -35,11 +36,12 @@ import org.apache.flink.util.ExceptionUtils;
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.ArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -140,9 +142,9 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 		this.initialCredit = segments.size();
 		this.numRequiredBuffers = segments.size();
 
-		synchronized(bufferQueue) {
+		synchronized (bufferQueue) {
 			for (MemorySegment segment : segments) {
-				bufferQueue.addExclusiveBuffer(new Buffer(segment, this), numRequiredBuffers);
+				bufferQueue.addExclusiveBuffer(new NetworkBuffer(segment, this), numRequiredBuffers);
 			}
 		}
 	}
@@ -312,7 +314,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 					ExceptionUtils.rethrow(t);
 				}
 			}
-			numAddedBuffers = bufferQueue.addExclusiveBuffer(new Buffer(segment, this), numRequiredBuffers);
+			numAddedBuffers = bufferQueue.addExclusiveBuffer(new NetworkBuffer(segment, this), numRequiredBuffers);
 		}
 
 		if (numAddedBuffers > 0 && unannouncedCredit.getAndAdd(numAddedBuffers) == 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
index 4c25e0d..1f78cd9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.disk.iomanager;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.util.TestNotificationListener;
 
 import org.junit.AfterClass;
@@ -83,7 +84,7 @@ public class AsynchronousBufferFileWriterTest {
 
 		exception.expect(IOException.class);
 
-		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
+		Buffer buffer = new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
 			FreeingBufferRecycler.INSTANCE);
 		try {
 			writer.writeBlock(buffer);

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
index 07bf3ab..2b52184 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterFileSegmentReaderTest.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.util.event.NotificationListener;
+
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -35,6 +36,8 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import static org.apache.flink.runtime.io.disk.iomanager.BufferFileWriterReaderTest.fillBufferWithAscendingNumbers;
+import static org.apache.flink.runtime.io.disk.iomanager.BufferFileWriterReaderTest.verifyBufferFilledWithAscendingNumbers;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -108,9 +111,7 @@ public class BufferFileWriterFileSegmentReaderTest {
 
 			int size = getNextMultipleOf(getRandomNumberInRange(minBufferSize, BUFFER_SIZE), 4);
 
-			buffer.setSize(size);
-
-			currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber);
+			currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber, size);
 
 			writer.writeBlock(buffer);
 		}
@@ -153,9 +154,9 @@ public class BufferFileWriterFileSegmentReaderTest {
 
 			fileSegment.getFileChannel().read(buffer, fileSegment.getPosition());
 
-			currentNumber = verifyBufferFilledWithAscendingNumbers(
-					new Buffer(MemorySegmentFactory.wrap(buffer.array()), BUFFER_RECYCLER),
-					currentNumber, fileSegment.getLength());
+			NetworkBuffer buffer1 = new NetworkBuffer(MemorySegmentFactory.wrap(buffer.array()), BUFFER_RECYCLER);
+			buffer1.setSize(fileSegment.getLength());
+			currentNumber = verifyBufferFilledWithAscendingNumbers(buffer1, currentNumber);
 		}
 
 		reader.close();
@@ -178,30 +179,6 @@ public class BufferFileWriterFileSegmentReaderTest {
 	}
 
 	private Buffer createBuffer() {
-		return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), BUFFER_RECYCLER);
-	}
-
-	public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {
-		MemorySegment segment = buffer.getMemorySegment();
-
-		final int size = buffer.getSize();
-
-		for (int i = 0; i < size; i += 4) {
-			segment.putInt(i, currentNumber++);
-		}
-
-		return currentNumber;
-	}
-
-	private int verifyBufferFilledWithAscendingNumbers(Buffer buffer, int currentNumber, int size) {
-		MemorySegment segment = buffer.getMemorySegment();
-
-		for (int i = 0; i < size; i += 4) {
-			if (segment.getInt(i) != currentNumber++) {
-				throw new IllegalStateException("Read unexpected number from buffer.");
-			}
-		}
-
-		return currentNumber;
+		return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), BUFFER_RECYCLER);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
index 86e8758..29e7b44 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/BufferFileWriterReaderTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 
 import org.junit.After;
 import org.junit.AfterClass;
@@ -33,6 +34,7 @@ import java.io.IOException;
 import java.util.Random;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -106,9 +108,7 @@ public class BufferFileWriterReaderTest {
 
 			int size = getNextMultipleOf(getRandomNumberInRange(minBufferSize, BUFFER_SIZE), 4);
 
-			buffer.setSize(size);
-
-			currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber);
+			currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber, size);
 
 			writer.writeBlock(buffer);
 		}
@@ -146,7 +146,7 @@ public class BufferFileWriterReaderTest {
 		for (int i = 0; i < numBuffers; i++) {
 			final Buffer buffer = createBuffer();
 
-			currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber);
+			currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber, buffer.getMaxCapacity());
 
 			writer.writeBlock(buffer);
 		}
@@ -200,25 +200,26 @@ public class BufferFileWriterReaderTest {
 	}
 
 	private Buffer createBuffer() {
-		return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), BUFFER_RECYCLER);
+		return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), BUFFER_RECYCLER);
 	}
 
-	public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {
-		MemorySegment segment = buffer.getMemorySegment();
+	static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber, int size) {
+		checkArgument(size % 4 == 0);
 
-		final int size = buffer.getSize();
+		MemorySegment segment = buffer.getMemorySegment();
 
 		for (int i = 0; i < size; i += 4) {
 			segment.putInt(i, currentNumber++);
 		}
+		buffer.setSize(size);
 
 		return currentNumber;
 	}
 
-	private int verifyBufferFilledWithAscendingNumbers(Buffer buffer, int currentNumber) {
+	static int verifyBufferFilledWithAscendingNumbers(Buffer buffer, int currentNumber) {
 		MemorySegment segment = buffer.getMemorySegment();
 
-		final int size = buffer.getSize();
+		int size = buffer.getSize();
 
 		for (int i = 0; i < size; i += 4) {
 			if (segment.getInt(i) != currentNumber++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 5c3d85a..ded1817 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
@@ -497,7 +498,7 @@ public class RecordWriterTest {
 					@Override
 					public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
 						MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(bufferSize);
-						Buffer buffer = new Buffer(segment, FreeingBufferRecycler.INSTANCE);
+						Buffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE);
 						return buffer;
 					}
 				}