You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/18 14:25:55 UTC

[4/9] flink git commit: [FLINK-7520][network] let our Buffer class extend from netty's buffer class

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
index 87aab01..d0df02d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
@@ -105,7 +105,7 @@ public class BufferBuilderTest {
 	}
 
 	private static void assertBufferContent(Buffer actualBuffer, int... expected) {
-		assertEquals(toByteBuffer(expected), actualBuffer.getNioBuffer());
+		assertEquals(toByteBuffer(expected), actualBuffer.getNioBufferReadable());
 	}
 
 	private static BufferBuilder createBufferBuilder() {

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
index 57c8077..f59fa9f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
@@ -20,54 +20,392 @@ package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
 
 import org.junit.Assert;
 import org.junit.Test;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
-public class BufferTest {
+/**
+ * Tests for the {@link Buffer} class.
+ */
+public class BufferTest extends AbstractByteBufTest {
+
+	/**
+	 * Upper limit for the max size that is sufficient for all the tests.
+	 */
+	private static final int MAX_CAPACITY_UPPER_BOUND = 64 * 1024 * 1024;
+
+	private static final NettyBufferPool NETTY_BUFFER_POOL = new NettyBufferPool(1);
+
+	@Override
+	protected NetworkBuffer newBuffer(int length, int maxCapacity) {
+		return newBuffer(length, maxCapacity, false);
+	}
+
+	/**
+	 * Creates a new buffer for testing.
+	 *
+	 * @param length
+	 * 		buffer capacity
+	 * @param maxCapacity
+	 * 		buffer maximum capacity (will be used for the underlying {@link MemorySegment})
+	 * @param isBuffer
+	 * 		whether the buffer should represent data (<tt>true</tt>) or an event (<tt>false</tt>)
+	 *
+	 * @return the buffer
+	 */
+	private static NetworkBuffer newBuffer(int length, int maxCapacity, boolean isBuffer) {
+		return newBuffer(length, maxCapacity, isBuffer, FreeingBufferRecycler.INSTANCE);
+	}
+
+	/**
+	 * Creates a new buffer for testing.
+	 *
+	 * @param length
+	 * 		buffer capacity
+	 * @param maxCapacity
+	 * 		buffer maximum capacity (will be used for the underlying {@link MemorySegment})
+	 * @param isBuffer
+	 * 		whether the buffer should represent data (<tt>true</tt>) or an event (<tt>false</tt>)
+	 * @param recycler
+	 * 		the buffer recycler to use
+	 *
+	 * @return the buffer
+	 */
+	private static NetworkBuffer newBuffer(int length, int maxCapacity, boolean isBuffer, BufferRecycler recycler) {
+		final MemorySegment segment =
+			MemorySegmentFactory
+				.allocateUnpooledSegment(Math.min(maxCapacity, MAX_CAPACITY_UPPER_BOUND));
+
+		NetworkBuffer buffer = new NetworkBuffer(segment, recycler, isBuffer);
+		buffer.capacity(length);
+		buffer.setAllocator(NETTY_BUFFER_POOL);
+
+		assertSame(ByteOrder.BIG_ENDIAN, buffer.order());
+		assertEquals(0, buffer.readerIndex());
+		assertEquals(0, buffer.writerIndex());
+		return buffer;
+	}
 
 	@Test
-	public void testSetGetSize() {
-		final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
+	public void testDataBufferIsBuffer() {
+		assertFalse(newBuffer(1024, 1024, false).isBuffer());
+	}
+
+	@Test
+	public void testEventBufferIsBuffer() {
+		assertFalse(newBuffer(1024, 1024, false).isBuffer());
+	}
 
-		Buffer buffer = new Buffer(segment, FreeingBufferRecycler.INSTANCE);
-		Assert.assertEquals(segment.size(), buffer.getSize());
+	@Test
+	public void testDataBufferTagAsEvent() {
+		testTagAsEvent(true);
+	}
 
-		buffer.setSize(segment.size() / 2);
-		Assert.assertEquals(segment.size() / 2, buffer.getSize());
+	@Test
+	public void testEventBufferTagAsEvent() {
+		testTagAsEvent(false);
+	}
 
-		try {
-			buffer.setSize(-1);
-			Assert.fail("Didn't throw expected exception");
-		} catch (IllegalArgumentException e) {
-			// OK => expected exception
-		}
+	private static void testTagAsEvent(boolean isBuffer) {
+		NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+		buffer.tagAsEvent();
+		assertFalse(buffer.isBuffer());
+	}
 
-		try {
-			buffer.setSize(segment.size() + 1);
-			Assert.fail("Didn't throw expected exception");
-		} catch (IllegalArgumentException e) {
-			// OK => expected exception
-		}
+	@Test
+	public void testDataBufferGetMemorySegment() {
+		testGetMemorySegment(true);
 	}
 
 	@Test
-	public void testgetNioBufferThreadSafe() {
+	public void testEventBufferGetMemorySegment() {
+		testGetMemorySegment(false);
+	}
+
+	private static void testGetMemorySegment(boolean isBuffer) {
 		final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
+		NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, isBuffer);
+		assertSame(segment, buffer.getMemorySegment());
+	}
+
+	@Test
+	public void testDataBufferGetRecycler() {
+		testGetRecycler(true);
+	}
+
+	@Test
+	public void testEventBufferGetRecycler() {
+		testGetRecycler(false);
+	}
+
+	private static void testGetRecycler(boolean isBuffer) {
+		BufferRecycler recycler = MemorySegment::free;
+
+		NetworkBuffer dataBuffer = newBuffer(1024, 1024, isBuffer, recycler);
+		assertSame(recycler, dataBuffer.getRecycler());
+	}
+
+	@Test
+	public void testDataBufferRecycleBuffer() {
+		testRecycleBuffer(true);
+	}
+
+	@Test
+	public void testEventBufferRecycleBuffer() {
+		testRecycleBuffer(false);
+	}
+
+	/**
+	 * Tests that {@link NetworkBuffer#recycle()} and {@link NetworkBuffer#isRecycled()} are coupled
+	 * and are also consistent with {@link NetworkBuffer#refCnt()}.
+	 */
+	private static void testRecycleBuffer(boolean isBuffer) {
+		NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+		assertFalse(buffer.isRecycled());
+		buffer.recycle();
+		assertTrue(buffer.isRecycled());
+		assertEquals(0, buffer.refCnt());
+	}
+
+	@Test
+	public void testDataBufferRetainBuffer() {
+		testRetainBuffer(true);
+	}
+
+	@Test
+	public void testEventBufferRetainBuffer() {
+		testRetainBuffer(false);
+	}
+
+	/**
+	 * Tests that {@link NetworkBuffer#retain()} and {@link NetworkBuffer#isRecycled()} are coupled
+	 * and are also consistent with {@link NetworkBuffer#refCnt()}.
+	 */
+	private static void testRetainBuffer(boolean isBuffer) {
+		NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+		assertFalse(buffer.isRecycled());
+		buffer.retain();
+		assertFalse(buffer.isRecycled());
+		assertEquals(2, buffer.refCnt());
+	}
+
+	@Test
+	public void testDataBufferGetMaxCapacity() {
+		testGetMaxCapacity(true);
+	}
+
+	@Test
+	public void testEventBufferGetMaxCapacity() {
+		testGetMaxCapacity(false);
+	}
+
+	private static void testGetMaxCapacity(boolean isBuffer) {
+		NetworkBuffer buffer = newBuffer(100, 1024, isBuffer);
+		assertEquals(1024, buffer.getMaxCapacity());
+		MemorySegment segment = buffer.getMemorySegment();
+		Assert.assertEquals(segment.size(), buffer.getMaxCapacity());
+		Assert.assertEquals(segment.size(), buffer.maxCapacity());
+	}
+
+	@Test
+	public void testDataBufferGetSetReaderIndex() {
+		testGetSetReaderIndex(true);
+	}
+
+	@Test
+	public void testEventBufferGetSetReaderIndex() {
+		testGetSetReaderIndex(false);
+	}
+
+	/**
+	 * Tests that {@link NetworkBuffer#setReaderIndex(int)} and
+	 * {@link NetworkBuffer#getReaderIndex()} are consistent.
+	 */
+	private static void testGetSetReaderIndex(boolean isBuffer) {
+		NetworkBuffer buffer = newBuffer(100, 1024, isBuffer);
+		assertEquals(0, buffer.getReaderIndex());
+
+		// fake some data
+		buffer.setSize(100);
+		assertEquals(0, buffer.getReaderIndex());
+		buffer.setReaderIndex(1);
+		assertEquals(1, buffer.getReaderIndex());
+	}
+
+	@Test
+	public void testDataBufferSetGetSize() {
+		testSetGetSize(true);
+	}
+
+	@Test
+	public void testEventBufferSetGetSize() {
+		testSetGetSize(false);
+	}
+
+	private static void testSetGetSize(boolean isBuffer) {
+		NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+
+		assertEquals(0, buffer.getSize()); // initially 0
+		assertEquals(0, buffer.getSizeUnsafe());
+		assertEquals(buffer.writerIndex(), buffer.getSize());
+		assertEquals(0, buffer.readerIndex()); // initially 0
+
+		buffer.setSize(10);
+		assertEquals(10, buffer.getSize());
+		assertEquals(10, buffer.getSizeUnsafe());
+		assertEquals(buffer.writerIndex(), buffer.getSize());
+		assertEquals(0, buffer.readerIndex()); // independent
+	}
+
+	@Test
+	public void testDataBufferReadableBytes() {
+		testReadableBytes(true);
+	}
+
+	@Test
+	public void testEventBufferReadableBytes() {
+		testReadableBytes(false);
+	}
+
+	private static void testReadableBytes(boolean isBuffer) {
+		NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+
+		assertEquals(0, buffer.readableBytes());
+		buffer.setSize(10);
+		assertEquals(10, buffer.readableBytes());
+		buffer.setReaderIndex(2);
+		assertEquals(8, buffer.readableBytes());
+		buffer.setReaderIndex(10);
+		assertEquals(0, buffer.readableBytes());
+	}
+
+	@Test
+	public void testDataBufferGetNioBufferReadable() {
+		testGetNioBufferReadable(true);
+	}
+
+	@Test
+	public void testEventBufferGetNioBufferReadable() {
+		testGetNioBufferReadable(false);
+	}
+
+	private void testGetNioBufferReadable(boolean isBuffer) {
+		NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
 
-		Buffer buffer = new Buffer(segment, FreeingBufferRecycler.INSTANCE);
+		ByteBuffer byteBuffer = buffer.getNioBufferReadable();
+		assertFalse(byteBuffer.isReadOnly());
+		assertEquals(0, byteBuffer.remaining());
+		assertEquals(0, byteBuffer.limit());
+		assertEquals(0, byteBuffer.capacity());
 
-		ByteBuffer buf1 = buffer.getNioBuffer();
-		ByteBuffer buf2 = buffer.getNioBuffer();
+		// add some data
+		buffer.setSize(10);
+		// nothing changes in the byteBuffer
+		assertEquals(0, byteBuffer.remaining());
+		assertEquals(0, byteBuffer.limit());
+		assertEquals(0, byteBuffer.capacity());
+		// get a new byteBuffer (should have updated indices)
+		byteBuffer = buffer.getNioBufferReadable();
+		assertFalse(byteBuffer.isReadOnly());
+		assertEquals(10, byteBuffer.remaining());
+		assertEquals(10, byteBuffer.limit());
+		assertEquals(10, byteBuffer.capacity());
+
+		// modify byteBuffer position and verify nothing has changed in the original buffer
+		byteBuffer.position(1);
+		assertEquals(0, buffer.getReaderIndex());
+		assertEquals(10, buffer.getSize());
+	}
+
+	@Test
+	public void testGetNioBufferReadableThreadSafe() {
+		NetworkBuffer buffer = newBuffer(1024, 1024);
+
+		ByteBuffer buf1 = buffer.getNioBufferReadable();
+		ByteBuffer buf2 = buffer.getNioBufferReadable();
 
 		assertNotNull(buf1);
 		assertNotNull(buf2);
 
 		assertTrue("Repeated call to getNioBuffer() returns the same nio buffer", buf1 != buf2);
 	}
+
+	@Test
+	public void testDataBufferGetNioBuffer() {
+		testGetNioBuffer(true);
+	}
+
+	@Test
+	public void testEventBufferGetNioBuffer() {
+		testGetNioBuffer(false);
+	}
+
+	private void testGetNioBuffer(boolean isBuffer) {
+		NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+
+		ByteBuffer byteBuffer = buffer.getNioBuffer(1, 1);
+		assertFalse(byteBuffer.isReadOnly());
+		assertEquals(1, byteBuffer.remaining());
+		assertEquals(1, byteBuffer.limit());
+		assertEquals(1, byteBuffer.capacity());
+
+		// add some data
+		buffer.setSize(10);
+		// nothing changes in the byteBuffer
+		assertEquals(1, byteBuffer.remaining());
+		assertEquals(1, byteBuffer.limit());
+		assertEquals(1, byteBuffer.capacity());
+		// get a new byteBuffer (should have updated indices)
+		byteBuffer = buffer.getNioBuffer(1, 2);
+		assertFalse(byteBuffer.isReadOnly());
+		assertEquals(2, byteBuffer.remaining());
+		assertEquals(2, byteBuffer.limit());
+		assertEquals(2, byteBuffer.capacity());
+
+		// modify byteBuffer position and verify nothing has changed in the original buffer
+		byteBuffer.position(1);
+		assertEquals(0, buffer.getReaderIndex());
+		assertEquals(10, buffer.getSize());
+	}
+
+	@Test
+	public void testGetNioBufferThreadSafe() {
+		NetworkBuffer buffer = newBuffer(1024, 1024);
+
+		ByteBuffer buf1 = buffer.getNioBuffer(0, 10);
+		ByteBuffer buf2 = buffer.getNioBuffer(0, 10);
+
+		assertNotNull(buf1);
+		assertNotNull(buf2);
+
+		assertTrue("Repeated call to getNioBuffer(int, int) returns the same nio buffer", buf1 != buf2);
+	}
+
+	@Test
+	public void testDataBufferSetAllocator() {
+		testSetAllocator(true);
+	}
+
+	@Test
+	public void testEventBufferSetAllocator() {
+		testSetAllocator(false);
+	}
+
+	private void testSetAllocator(boolean isBuffer) {
+		NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+		NettyBufferPool allocator = new NettyBufferPool(1);
+
+		buffer.setAllocator(allocator);
+		assertSame(allocator, buffer.alloc());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 4acdb36..b51dc1e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.netty;
 
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
@@ -37,6 +38,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -189,7 +191,9 @@ public class CancelPartitionRequestTest {
 		@Nullable
 		@Override
 		public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException {
-			return new BufferAndBacklog(bufferProvider.requestBufferBlocking(), 0);
+			Buffer buffer = bufferProvider.requestBufferBlocking();
+			buffer.setSize(buffer.getMaxCapacity()); // fake some data
+			return new BufferAndBacklog(buffer, 0);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index daff1c9..ed5fff8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.event.task.IntegerTaskEvent;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -54,12 +55,13 @@ public class NettyMessageSerializationTest {
 	@Test
 	public void testEncodeDecode() {
 		{
-			Buffer buffer = spy(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE));
-			ByteBuffer nioBuffer = buffer.getNioBuffer();
+			Buffer buffer = spy(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE));
+			ByteBuffer nioBuffer = buffer.getNioBuffer(0, buffer.getMaxCapacity());
 
 			for (int i = 0; i < 1024; i += 4) {
 				nioBuffer.putInt(i);
 			}
+			buffer.setSize(1024);
 
 			NettyMessage.BufferResponse expected = new NettyMessage.BufferResponse(buffer, random.nextInt(), new InputChannelID(), random.nextInt());
 			NettyMessage.BufferResponse actual = encodeAndDecode(expected);

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 42a5f11..996722f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -26,9 +26,9 @@ import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyMessage.AddCredit;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
-import org.apache.flink.runtime.io.network.netty.NettyMessage.AddCredit;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -51,6 +51,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -64,7 +65,6 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.hamcrest.Matchers.instanceOf;
 
 public class PartitionRequestClientHandlerTest {
 
@@ -93,7 +93,7 @@ public class PartitionRequestClientHandlerTest {
 		when(inputChannel.getBufferProvider()).thenReturn(bufferProvider);
 
 		final BufferResponse receivedBuffer = createBufferResponse(
-				TestBufferFactory.createBuffer(), 0, inputChannel.getInputChannelId(), 2);
+				TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE), 0, inputChannel.getInputChannelId(), 2);
 
 		final PartitionRequestClientHandler client = new PartitionRequestClientHandler();
 		client.addInputChannel(inputChannel);
@@ -110,15 +110,14 @@ public class PartitionRequestClientHandlerTest {
 	public void testReceiveEmptyBuffer() throws Exception {
 		// Minimal mock of a remote input channel
 		final BufferProvider bufferProvider = mock(BufferProvider.class);
-		when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
+		when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer(0));
 
 		final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
 		when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID());
 		when(inputChannel.getBufferProvider()).thenReturn(bufferProvider);
 
 		// An empty buffer of size 0
-		final Buffer emptyBuffer = TestBufferFactory.createBuffer();
-		emptyBuffer.setSize(0);
+		final Buffer emptyBuffer = TestBufferFactory.createBuffer(0);
 
 		final int backlog = 2;
 		final BufferResponse receivedBuffer = createBufferResponse(
@@ -186,7 +185,7 @@ public class PartitionRequestClientHandlerTest {
 			0, inputChannel.getNumberOfAvailableBuffers());
 
 		final BufferResponse bufferResponse = createBufferResponse(
-			TestBufferFactory.createBuffer(), 0, inputChannel.getInputChannelId(), 2);
+			TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE), 0, inputChannel.getInputChannelId(), 2);
 		handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse);
 
 		verify(inputChannel, times(1)).onError(any(IllegalStateException.class));
@@ -200,7 +199,7 @@ public class PartitionRequestClientHandlerTest {
 	public void testReceivePartitionNotFoundException() throws Exception {
 		// Minimal mock of a remote input channel
 		final BufferProvider bufferProvider = mock(BufferProvider.class);
-		when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
+		when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer(0));
 
 		final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
 		when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID());

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 8b1e8d2..56ed622 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -19,11 +19,9 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
@@ -107,7 +105,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		verify(listener, times(1)).notifyBuffersAvailable(eq(0L));
 
 		// Add data to the queue...
-		subpartition.add(createBuffer());
+		subpartition.add(createBuffer(BUFFER_SIZE));
 		assertEquals(1, subpartition.getTotalNumberOfBuffers());
 		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
 
@@ -127,7 +125,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(0, subpartition.getBuffersInBacklog());
 
 		// Add data to the queue...
-		subpartition.add(createBuffer());
+		subpartition.add(createBuffer(BUFFER_SIZE));
 
 		assertEquals(2, subpartition.getTotalNumberOfBuffers());
 		assertEquals(1, subpartition.getBuffersInBacklog());
@@ -135,7 +133,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
 
 		// Add event to the queue...
-		Buffer event = createBuffer();
+		Buffer event = createBuffer(BUFFER_SIZE);
 		event.tagAsEvent();
 		subpartition.add(event);
 
@@ -290,10 +288,8 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 	private void testCleanupReleasedPartition(boolean createView) throws Exception {
 		PipelinedSubpartition partition = createSubpartition();
 
-		Buffer buffer1 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
-			FreeingBufferRecycler.INSTANCE);
-		Buffer buffer2 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
-			FreeingBufferRecycler.INSTANCE);
+		Buffer buffer1 = createBuffer(4096);
+		Buffer buffer2 = createBuffer(4096);
 		boolean buffer1Recycled;
 		boolean buffer2Recycled;
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 4512625..5f5f459 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -64,7 +64,7 @@ public class ResultPartitionTest {
 			// Pipelined, send message => notify
 			ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
 			ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, true);
-			partition.writeBuffer(TestBufferFactory.createBuffer(), 0);
+			partition.writeBuffer(TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE), 0);
 			verify(notifier, times(1))
 				.notifyPartitionConsumable(
 					eq(partition.getJobId()),
@@ -76,7 +76,7 @@ public class ResultPartitionTest {
 			// Pipelined, don't send message => don't notify
 			ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
 			ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, false);
-			partition.writeBuffer(TestBufferFactory.createBuffer(), 0);
+			partition.writeBuffer(TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE), 0);
 			verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
 		}
 
@@ -84,7 +84,7 @@ public class ResultPartitionTest {
 			// Blocking, send message => don't notify
 			ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
 			ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, true);
-			partition.writeBuffer(TestBufferFactory.createBuffer(), 0);
+			partition.writeBuffer(TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE), 0);
 			verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
 		}
 
@@ -92,7 +92,7 @@ public class ResultPartitionTest {
 			// Blocking, don't send message => don't notify
 			ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
 			ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, false);
-			partition.writeBuffer(TestBufferFactory.createBuffer(), 0);
+			partition.writeBuffer(TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE), 0);
 			verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
 		}
 	}
@@ -114,7 +114,7 @@ public class ResultPartitionTest {
 	 */
 	protected void testAddOnFinishedPartition(final ResultPartitionType pipelined)
 		throws Exception {
-		Buffer buffer = TestBufferFactory.createBuffer();
+		Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
 		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
 		try {
 			ResultPartition partition = createPartition(notifier, pipelined, true);
@@ -152,7 +152,7 @@ public class ResultPartitionTest {
 	 */
 	protected void testAddOnReleasedPartition(final ResultPartitionType pipelined)
 		throws Exception {
-		Buffer buffer = TestBufferFactory.createBuffer();
+		Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
 		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
 		try {
 			ResultPartition partition = createPartition(notifier, pipelined, true);
@@ -217,7 +217,7 @@ public class ResultPartitionTest {
 		throws Exception {
 		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
 		ResultPartition partition = createPartition(notifier, pipelined, true);
-		Buffer buffer = TestBufferFactory.createBuffer();
+		Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
 		try {
 			// partition.add() adds the buffer without recycling it (if not spilling)
 			partition.writeBuffer(buffer, 0);

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 3b00b2e..9664945 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -193,7 +193,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 	public void testConsumeSpilledPartition() throws Exception {
 		SpillableSubpartition partition = createSubpartition();
 
-		Buffer buffer = TestBufferFactory.createBuffer(4096);
+		Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
 		buffer.retain();
 		buffer.retain();
 
@@ -291,7 +291,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 	public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception {
 		SpillableSubpartition partition = createSubpartition();
 
-		Buffer buffer = TestBufferFactory.createBuffer(4096);
+		Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
 		buffer.retain();
 		buffer.retain();
 
@@ -403,7 +403,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(1, partition.getTotalNumberOfBuffers());
 		assertEquals(4, partition.getTotalNumberOfBytes());
 
-		Buffer buffer = TestBufferFactory.createBuffer(4096);
+		Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
 		try {
 			partition.add(buffer);
 		} finally {
@@ -447,7 +447,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 			assertEquals(0, partition.releaseMemory());
 		}
 
-		Buffer buffer = TestBufferFactory.createBuffer(4096);
+		Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
 		boolean bufferRecycled;
 		try {
 			partition.add(buffer);
@@ -475,7 +475,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		SpillableSubpartition partition = createSubpartition(ioManager);
 		assertEquals(0, partition.releaseMemory());
 
-		Buffer buffer = TestBufferFactory.createBuffer(4096);
+		Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
 		boolean bufferRecycled;
 		try {
 			partition.add(buffer);
@@ -521,8 +521,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		IOManager ioManager = new IOManagerAsyncWithNoOpBufferFileWriter();
 		SpillableSubpartition partition = createSubpartition(ioManager);
 
-		Buffer buffer1 = TestBufferFactory.createBuffer(4096);
-		Buffer buffer2 = TestBufferFactory.createBuffer(4096);
+		Buffer buffer1 = TestBufferFactory.createBuffer(4096, 4096);
+		Buffer buffer2 = TestBufferFactory.createBuffer(4096, 4096);
 		try {
 			// we need two buffers because the view will use one of them and not release it
 			partition.add(buffer1);
@@ -569,7 +569,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 
 		exception.expect(IOException.class);
 
-		Buffer buffer = TestBufferFactory.createBuffer(4096);
+		Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
 		boolean bufferRecycled;
 		try {
 			partition.add(buffer);
@@ -635,8 +635,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 	private void testCleanupReleasedPartition(boolean spilled, boolean createView) throws Exception {
 		SpillableSubpartition partition = createSubpartition();
 
-		Buffer buffer1 = TestBufferFactory.createBuffer(4096);
-		Buffer buffer2 = TestBufferFactory.createBuffer(4096);
+		Buffer buffer1 = TestBufferFactory.createBuffer(4096, 4096);
+		Buffer buffer2 = TestBufferFactory.createBuffer(4096, 4096);
 		boolean buffer1Recycled;
 		boolean buffer2Recycled;
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
index 89a4e03..08444f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
@@ -213,7 +213,7 @@ public class SpilledSubpartitionViewTest {
 		final BufferFileWriter writer = IO_MANAGER.createBufferFileWriter(IO_MANAGER.createChannel());
 
 		for (int i = 0; i < numberOfBuffers; i++) {
-			writer.writeBlock(TestBufferFactory.createBuffer());
+			writer.writeBlock(TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE));
 		}
 
 		writer.writeBlock(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE));

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 2d56258..5e12835 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -57,7 +56,7 @@ public abstract class SubpartitionTestBase extends TestLogger {
 			assertEquals(0, subpartition.getBuffersInBacklog());
 			assertEquals(4, subpartition.getTotalNumberOfBytes());
 
-			Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
+			Buffer buffer = createBuffer(4096, 4096);
 
 			assertFalse(subpartition.add(buffer));
 			assertEquals(1, subpartition.getTotalNumberOfBuffers());
@@ -83,7 +82,7 @@ public abstract class SubpartitionTestBase extends TestLogger {
 			assertEquals(0, subpartition.getBuffersInBacklog());
 			assertEquals(0, subpartition.getTotalNumberOfBytes());
 
-			Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
+			Buffer buffer = createBuffer(4096, 4096);
 
 			assertFalse(subpartition.add(buffer));
 			assertEquals(0, subpartition.getTotalNumberOfBuffers());
@@ -112,7 +111,7 @@ public abstract class SubpartitionTestBase extends TestLogger {
 
 	private void verifyViewReleasedAfterParentRelease(ResultSubpartition partition) throws Exception {
 		// Add a buffer
-		Buffer buffer = TestBufferFactory.createBuffer();
+		Buffer buffer = createBuffer(TestBufferFactory.BUFFER_SIZE);
 		partition.add(buffer);
 		partition.finish();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index eab1d89..469aa97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -69,7 +69,7 @@ public class RemoteInputChannelTest {
 		// Setup
 		final SingleInputGate inputGate = mock(SingleInputGate.class);
 		final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
-		final Buffer buffer = TestBufferFactory.createBuffer();
+		final Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
 
 		// The test
 		inputChannel.onBuffer(buffer.retain(), 0, -1);
@@ -103,7 +103,7 @@ public class RemoteInputChannelTest {
 
 		// Setup
 		final ExecutorService executor = Executors.newFixedThreadPool(2);
-		final Buffer buffer = TestBufferFactory.createBuffer();
+		final Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
 
 		try {
 			// Test

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index f45d98e..364ac75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -31,9 +31,9 @@ import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.LocalConnectionManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -126,7 +126,7 @@ public class SingleInputGateTest {
 
 		final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class);
 		when(iterator.getNextBuffer()).thenReturn(
-			new BufferAndBacklog(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), 0));
+			new BufferAndBacklog(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), 0));
 
 		final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 		when(partitionManager.createSubpartitionView(

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
index e0dc429..406b81f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 
 import javax.annotation.concurrent.ThreadSafe;
 
@@ -56,7 +57,7 @@ public class TestBufferFactory {
 		}
 
 		numberOfCreatedBuffers++;
-		return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), bufferRecycler);
+		return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), bufferRecycler);
 	}
 
 	public synchronized int getNumberOfCreatedBuffers() {
@@ -71,13 +72,31 @@ public class TestBufferFactory {
 	// Static test helpers
 	// ------------------------------------------------------------------------
 
-	public static Buffer createBuffer() {
-		return createBuffer(BUFFER_SIZE);
+	/**
+	 * Creates a (network) buffer with default size, i.e. {@link #BUFFER_SIZE}, and unspecified data
+	 * of the given size.
+	 *
+	 * @param dataSize
+	 * 		size of the data in the buffer, i.e. the new writer index
+	 *
+	 * @return a new buffer instance
+	 */
+	public static Buffer createBuffer(int dataSize) {
+		return createBuffer(BUFFER_SIZE, dataSize);
 	}
 
-	public static Buffer createBuffer(int bufferSize) {
-		checkArgument(bufferSize > 0);
-
-		return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), RECYCLER);
+	/**
+	 * Creates a (network) buffer with unspecified data of the given size.
+	 *
+	 * @param bufferSize
+	 * 		size of the buffer
+	 * @param dataSize
+	 * 		size of the data in the buffer, i.e. the new writer index
+	 *
+	 * @return a new buffer instance
+	 */
+	public static Buffer createBuffer(int bufferSize, int dataSize) {
+		return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
+				RECYCLER, true, dataSize);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index eb80578..eba5912 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferListener;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Queues;
 
@@ -124,7 +125,7 @@ public class TestPooledBufferProvider implements BufferProvider {
 		@Override
 		public void recycle(MemorySegment segment) {
 			synchronized (listenerRegistrationLock) {
-				final Buffer buffer = new Buffer(segment, this);
+				final Buffer buffer = new NetworkBuffer(segment, this);
 
 				BufferListener listener = registeredListeners.poll();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 98037cb..7f6e875 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.util.StringUtils;
 
@@ -131,7 +132,7 @@ public class BufferSpiller {
 			ByteBuffer contents;
 			if (boe.isBuffer()) {
 				Buffer buf = boe.getBuffer();
-				contents = buf.getMemorySegment().wrap(0, buf.getSize());
+				contents = buf.getNioBufferReadable();
 			}
 			else {
 				contents = EventSerializer.toSerializedEvent(boe.getEvent());
@@ -380,7 +381,7 @@ public class BufferSpiller {
 					}
 				}
 
-				Buffer buf = new Buffer(seg, FreeingBufferRecycler.INSTANCE);
+				Buffer buf = new NetworkBuffer(seg, FreeingBufferRecycler.INSTANCE);
 				buf.setSize(length);
 
 				return new BufferOrEvent(buf, channel);

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 3899fe1..cca6fb3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
@@ -277,7 +278,7 @@ public class BarrierBufferAlignmentLimitTest {
 		MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
 		memory.put(0, bytes);
 
-		Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+		Buffer buf = new NetworkBuffer(memory, FreeingBufferRecycler.INSTANCE);
 		buf.setSize(size);
 
 		// retain an additional time so it does not get disposed after being read by the input gate
@@ -296,6 +297,7 @@ public class BarrierBufferAlignmentLimitTest {
 		assertEquals(expected.isBuffer(), present.isBuffer());
 
 		if (expected.isBuffer()) {
+			assertEquals(expected.getBuffer().getMaxCapacity(), present.getBuffer().getMaxCapacity());
 			assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
 			MemorySegment expectedMem = expected.getBuffer().getMemorySegment();
 			MemorySegment presentMem = present.getBuffer().getMemorySegment();

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 57ecf3a..c7c9df2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -1406,7 +1407,7 @@ public class BarrierBufferTest {
 		MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
 		memory.put(0, bytes);
 
-		Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+		Buffer buf = new NetworkBuffer(memory, FreeingBufferRecycler.INSTANCE);
 		buf.setSize(size);
 
 		// retain an additional time so it does not get disposed after being read by the input gate
@@ -1425,6 +1426,7 @@ public class BarrierBufferTest {
 		assertEquals(expected.isBuffer(), present.isBuffer());
 
 		if (expected.isBuffer()) {
+			assertEquals(expected.getBuffer().getMaxCapacity(), present.getBuffer().getMaxCapacity());
 			assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
 			MemorySegment expectedMem = expected.getBuffer().getMemorySegment();
 			MemorySegment presentMem = present.getBuffer().getMemorySegment();

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 535fdb9..bcf821d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -480,7 +480,7 @@ public class BarrierTrackerTest {
 
 	private static BufferOrEvent createBuffer(int channel) {
 		return new BufferOrEvent(
-				new Buffer(MemorySegmentFactory.wrap(new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), channel);
+				new NetworkBuffer(MemorySegmentFactory.wrap(new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), channel);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index 4edb665..ee58052 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 
 import org.junit.After;
@@ -374,7 +375,7 @@ public class BufferSpillerTest {
 			seg.put(i, (byte) i);
 		}
 
-		Buffer buf = new Buffer(seg, FreeingBufferRecycler.INSTANCE);
+		Buffer buf = new NetworkBuffer(seg, FreeingBufferRecycler.INSTANCE);
 		buf.setSize(size);
 		return new BufferOrEvent(buf, channelIndex);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 72f11ca..79e8b30 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1034,6 +1034,8 @@ under the License.
 						<exclude>flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/generated/*.java</exclude>
 						<exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv</exclude>
 						<exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text</exclude>
+						<!-- netty test file, still Apache License 2.0 but with a different header -->
+						<exclude>flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java</exclude>
 						<!-- Configuration Files. -->
 						<exclude>**/flink-bin/conf/slaves</exclude>
 						<exclude>**/flink-bin/conf/masters</exclude>

http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/tools/maven/suppressions-runtime.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml
index 16f444c..9d9cb87 100644
--- a/tools/maven/suppressions-runtime.xml
+++ b/tools/maven/suppressions-runtime.xml
@@ -93,6 +93,10 @@ under the License.
 	<suppress
 		files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](api|buffer|netty|partition|serialization|util)[/\\](.*)"
 		checks="AvoidStarImport|UnusedImports"/>
+	<!--Test class copied from the netty project-->
+	<suppress
+		files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\]buffer[/\\]AbstractByteBufTest.java"
+		checks="[a-zA-Z0-9]*"/>
 	<suppress
 		files="(.*)runtime[/\\]jobgraph[/\\](.*)"
 		checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>