You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/18 14:36:17 UTC

[1/2] flink git commit: [hotfix][network] rename Buffer#retain() and #recycle in preparation for FLINK-8396 and FLINK-8395

Repository: flink
Updated Branches:
  refs/heads/master 9d0dfcba6 -> 665347cf8


[hotfix][network] rename Buffer#retain() and #recycle in preparation for FLINK-8396 and FLINK-8395

Since these two methods also exist in Netty's ByteBuf, we would otherwise get
into overloading conflicts.

Also add Buffer#readableBytes() and Buffer#setAllocator().


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db440f24
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db440f24
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db440f24

Branch: refs/heads/master
Commit: db440f2434423a23207ba666b33f4ccb55adede5
Parents: 9d0dfcb
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jan 9 22:56:38 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Jan 18 15:31:18 2018 +0100

----------------------------------------------------------------------
 .../iomanager/AsynchronousBufferFileWriter.java |  6 +--
 .../iomanager/SynchronousBufferFileReader.java  |  2 +-
 .../api/reader/AbstractRecordReader.java        |  4 +-
 .../io/network/api/writer/RecordWriter.java     |  6 +--
 .../api/writer/ResultPartitionWriter.java       |  4 +-
 .../flink/runtime/io/network/buffer/Buffer.java | 23 +++++++++--
 .../io/network/buffer/NetworkBuffer.java        | 10 ++---
 .../runtime/io/network/netty/NettyMessage.java  |  2 +-
 .../netty/PartitionRequestClientHandler.java    |  4 +-
 .../io/network/netty/PartitionRequestQueue.java |  2 +-
 .../partition/PipelinedSubpartition.java        |  4 +-
 .../io/network/partition/ResultPartition.java   |  4 +-
 .../partition/ResultSubpartitionView.java       |  2 +-
 .../partition/SpillableSubpartition.java        |  8 ++--
 .../partition/SpillableSubpartitionView.java    |  2 +-
 .../partition/consumer/RemoteInputChannel.java  | 12 +++---
 .../AsynchronousBufferFileWriterTest.java       |  2 +-
 .../api/serialization/EventSerializerTest.java  |  4 +-
 .../io/network/api/writer/RecordWriterTest.java |  6 +--
 .../network/buffer/BufferPoolFactoryTest.java   |  4 +-
 .../io/network/buffer/LocalBufferPoolTest.java  | 34 ++++++++---------
 .../network/buffer/NetworkBufferPoolTest.java   |  4 +-
 .../netty/NettyMessageSerializationTest.java    |  2 +-
 .../PartialConsumePipelinedResultTest.java      |  2 +-
 .../partition/PipelinedSubpartitionTest.java    |  6 +--
 .../network/partition/ResultPartitionTest.java  |  6 +--
 .../partition/SpillableSubpartitionTest.java    | 40 ++++++++++----------
 .../consumer/LocalInputChannelTest.java         |  2 +-
 .../consumer/RemoteInputChannelTest.java        | 26 ++++++-------
 .../io/network/util/TestConsumerCallback.java   |  2 +-
 .../network/util/TestSubpartitionConsumer.java  |  2 +-
 .../BackPressureStatsTrackerITCase.java         |  4 +-
 .../streaming/runtime/io/BufferSpiller.java     |  2 +-
 .../runtime/io/StreamInputProcessor.java        |  4 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  4 +-
 .../io/BarrierBufferAlignmentLimitTest.java     |  2 +-
 .../io/BarrierBufferMassiveRandomTest.java      |  2 +-
 .../streaming/runtime/io/BarrierBufferTest.java |  2 +-
 38 files changed, 134 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
index 9a78d0a..0c58f16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
@@ -48,7 +48,7 @@ public class AsynchronousBufferFileWriter extends AsynchronousFileIOChannel<Buff
 			addRequest(new BufferWriteRequest(this, buffer));
 		} catch (Throwable e) {
 			// if not added, we need to recycle here
-			buffer.recycle();
+			buffer.recycleBuffer();
 			ExceptionUtils.rethrowIOException(e);
 		}
 
@@ -71,12 +71,12 @@ public class AsynchronousBufferFileWriter extends AsynchronousFileIOChannel<Buff
 
 		@Override
 		public void requestSuccessful(Buffer buffer) {
-			buffer.recycle();
+			buffer.recycleBuffer();
 		}
 
 		@Override
 		public void requestFailed(Buffer buffer, IOException e) {
-			buffer.recycle();
+			buffer.recycleBuffer();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
index 495336c..a30961f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
@@ -47,7 +47,7 @@ public class SynchronousBufferFileReader extends SynchronousFileIOChannel implem
 			hasReachedEndOfFile = reader.readBufferFromFileChannel(buffer);
 		}
 		else {
-			buffer.recycle();
+			buffer.recycleBuffer();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index 29f2b6d..1ac5f75 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -74,7 +74,7 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
 				if (result.isBufferConsumed()) {
 					final Buffer currentBuffer = currentRecordDeserializer.getCurrentBuffer();
 
-					currentBuffer.recycle();
+					currentBuffer.recycleBuffer();
 					currentRecordDeserializer = null;
 				}
 
@@ -118,7 +118,7 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
 		for (RecordDeserializer<?> deserializer : recordDeserializers) {
 			Buffer buffer = deserializer.getCurrentBuffer();
 			if (buffer != null && !buffer.isRecycled()) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 			deserializer.clear();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 39dbacc..de1cac6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -155,14 +155,14 @@ public class RecordWriter<T extends IOReadableWritable> {
 					}
 
 					// retain the buffer so that it can be recycled by each channel of targetPartition
-					eventBuffer.retain();
+					eventBuffer.retainBuffer();
 					targetPartition.writeBuffer(eventBuffer, targetChannel);
 				}
 			}
 		} finally {
 			// we do not need to further retain the eventBuffer
 			// (it will be recycled after the last channel stops using it)
-			eventBuffer.recycle();
+			eventBuffer.recycleBuffer();
 		}
 	}
 
@@ -192,7 +192,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 					Buffer buffer = serializer.getCurrentBuffer();
 
 					if (buffer != null) {
-						buffer.recycle();
+						buffer.recycleBuffer();
 					}
 				}
 				finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 3a66e53..454a9ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -57,13 +57,13 @@ public interface ResultPartitionWriter {
 		try {
 			for (int subpartition = 0; subpartition < getNumberOfSubpartitions(); subpartition++) {
 				// retain the buffer so that it can be recycled by each channel of targetPartition
-				buffer.retain();
+				buffer.retainBuffer();
 				writeBuffer(buffer, subpartition);
 			}
 		} finally {
 			// we do not need to further retain the eventBuffer
 			// (it will be recycled after the last channel stops using it)
-			buffer.recycle();
+			buffer.recycleBuffer();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index 12acfdb..f1eeb69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network.buffer;
 
 import org.apache.flink.core.memory.MemorySegment;
 
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+
 import java.nio.ByteBuffer;
 
 /**
@@ -74,9 +76,9 @@ public interface Buffer {
 	 * Releases this buffer once, i.e. reduces the reference count and recycles the buffer if the
 	 * reference count reaches <tt>0</tt>.
 	 *
-	 * @see #retain()
+	 * @see #retainBuffer()
 	 */
-	void recycle();
+	void recycleBuffer();
 
 	/**
 	 * Returns whether this buffer has been recycled or not.
@@ -90,9 +92,9 @@ public interface Buffer {
 	 *
 	 * @return <tt>this</tt> instance (for chained calls)
 	 *
-	 * @see #recycle()
+	 * @see #recycleBuffer()
 	 */
-	Buffer retain();
+	Buffer retainBuffer();
 
 	/**
 	 * Returns the maximum size of the buffer, i.e. the capacity of the underlying {@link MemorySegment}.
@@ -149,6 +151,12 @@ public interface Buffer {
 	void setSize(int writerIndex);
 
 	/**
+	 * Returns the number of readable bytes (same as <tt>{@link #getSize()} -
+	 * {@link #getReaderIndex()}</tt>).
+	 */
+	int readableBytes();
+
+	/**
 	 * Gets a new {@link ByteBuffer} instance wrapping this buffer's readable bytes, i.e. between
 	 * {@link #getReaderIndex()} and {@link #getSize()}.
 	 *
@@ -171,4 +179,11 @@ public interface Buffer {
 	 * @see #getNioBufferReadable()
 	 */
 	ByteBuffer getNioBuffer(int index, int length) throws IndexOutOfBoundsException;
+
+	/**
+	 * Sets the buffer allocator for use in netty.
+	 *
+	 * @param allocator netty buffer allocator
+	 */
+	void setAllocator(ByteBufAllocator allocator);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
index 4486caa..28c5699 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -137,7 +137,7 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
 	}
 
 	@Override
-	public void recycle() {
+	public void recycleBuffer() {
 		release();
 	}
 
@@ -147,7 +147,7 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
 	}
 
 	@Override
-	public NetworkBuffer retain() {
+	public NetworkBuffer retainBuffer() {
 		return (NetworkBuffer) super.retain();
 	}
 
@@ -414,11 +414,7 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
 		return checkNotNull(allocator);
 	}
 
-	/**
-	 * Sets the buffer allocator for use in netty.
-	 *
-	 * @param allocator netty buffer allocator
-	 */
+	@Override
 	public void setAllocator(ByteBufAllocator allocator) {
 		this.allocator = allocator;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 5160853..f05febc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -305,7 +305,7 @@ public abstract class NettyMessage {
 				throw new IOException(t);
 			}
 			finally {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 8e4d8cd..ba7bf22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -430,7 +430,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 			finally {
 				if (!success) {
 					if (buffer != null) {
-						buffer.recycle();
+						buffer.recycleBuffer();
 					}
 				}
 			}
@@ -485,7 +485,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
 			finally {
 				if (!success) {
 					if (buffer != null) {
-						buffer.recycle();
+						buffer.recycleBuffer();
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 41f87ae..58df4b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -213,7 +213,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 			}
 		} catch (Throwable t) {
 			if (next != null) {
-				next.buffer().recycle();
+				next.buffer().recycleBuffer();
 			}
 
 			throw new IOException(t.getMessage(), t);

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 07a53fb..051f871 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -74,7 +74,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 
 		synchronized (buffers) {
 			if (isFinished || isReleased) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 				return false;
 			}
 
@@ -133,7 +133,7 @@ class PipelinedSubpartition extends ResultSubpartition {
 			// Release all available buffers
 			Buffer buffer;
 			while ((buffer = buffers.poll()) != null) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 
 			view = readView;

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 01c8bfc..aac8fb9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -248,13 +248,13 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 			final ResultSubpartition subpartition = subpartitions[subpartitionIndex];
 
 			// retain for buffer use after add() but also to have a simple path for recycle()
-			buffer.retain();
+			buffer.retainBuffer();
 			success = subpartition.add(buffer);
 		} finally {
 			if (success) {
 				notifyPipelinedConsumers();
 			}
-			buffer.recycle();
+			buffer.recycleBuffer();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index fb31592..71753d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -37,7 +37,7 @@ public interface ResultSubpartitionView {
 	 * than the consumer or a spilled queue needs to read in more data.
 	 *
 	 * <p><strong>Important</strong>: The consumer has to make sure that each
-	 * buffer instance will eventually be recycled with {@link Buffer#recycle()}
+	 * buffer instance will eventually be recycled with {@link Buffer#recycleBuffer()}
 	 * after it has been consumed.
 	 */
 	@Nullable

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 827a577..e57e30a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -104,7 +104,7 @@ class SpillableSubpartition extends ResultSubpartition {
 
 		synchronized (buffers) {
 			if (isFinished || isReleased) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 				return false;
 			}
 
@@ -123,14 +123,14 @@ class SpillableSubpartition extends ResultSubpartition {
 		// Didn't return early => go to disk
 		try {
 			// retain buffer for updateStatistics() below
-			spillWriter.writeBlock(buffer.retain());
+			spillWriter.writeBlock(buffer.retainBuffer());
 			synchronized (buffers) {
 				// See the note above, but only do this if the buffer was correctly added!
 				updateStatistics(buffer);
 				increaseBuffersInBacklog(buffer);
 			}
 		} finally {
-			buffer.recycle();
+			buffer.recycleBuffer();
 		}
 
 		return true;
@@ -162,7 +162,7 @@ class SpillableSubpartition extends ResultSubpartition {
 
 			// Release all available buffers
 			for (Buffer buffer : buffers) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 			buffers.clear();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index a16e59a..279a023 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -172,7 +172,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 			// we are never giving this buffer out in getNextBuffer(), so we need to clean it up
 			synchronized (buffers) {
 				if (nextBuffer != null) {
-					nextBuffer.recycle();
+					nextBuffer.recycleBuffer();
 					nextBuffer = null;
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 9b0226a..0d633d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -246,7 +246,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 					if (buffer.getRecycler() == this) {
 						exclusiveRecyclingSegments.add(buffer.getMemorySegment());
 					} else {
-						buffer.recycle();
+						buffer.recycleBuffer();
 					}
 				}
 			}
@@ -350,7 +350,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 		// Check the isReleased state outside synchronized block first to avoid
 		// deadlock with releaseAllResources running in parallel.
 		if (isReleased.get()) {
-			buffer.recycle();
+			buffer.recycleBuffer();
 			return false;
 		}
 
@@ -361,7 +361,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 			// Important: double check the isReleased state inside synchronized block, so there is no
 			// race condition when notifyBufferAvailable and releaseAllResources running in parallel.
 			if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 				return false;
 			}
 
@@ -517,7 +517,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 			}
 		} finally {
 			if (!success) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 		}
 	}
@@ -599,7 +599,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 			exclusiveBuffers.add(buffer);
 			if (getAvailableBufferSize() > numRequiredBuffers) {
 				Buffer floatingBuffer = floatingBuffers.poll();
-				floatingBuffer.recycle();
+				floatingBuffer.recycleBuffer();
 				return 0;
 			} else {
 				return 1;
@@ -635,7 +635,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
 		void releaseAll(List<MemorySegment> exclusiveSegments) {
 			Buffer buffer;
 			while ((buffer = floatingBuffers.poll()) != null) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 			while ((buffer = exclusiveBuffers.poll()) != null) {
 				exclusiveSegments.add(buffer.getMemorySegment());

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
index 1f78cd9..bc4c42a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -90,7 +90,7 @@ public class AsynchronousBufferFileWriterTest {
 			writer.writeBlock(buffer);
 		} finally {
 			if (!buffer.isRecycled()) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 				Assert.fail("buffer not recycled");
 			}
 			assertEquals("Shouln't increment number of outstanding requests.", 0, writer.getNumberOfOutstandingRequests());

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index f79ab0f..aa14860 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -104,7 +104,7 @@ public class EventSerializerTest {
 				.fromBuffer(serializedEvent, cl);
 			assertEquals(EndOfPartitionEvent.INSTANCE, event);
 		} finally {
-			serializedEvent.recycle();
+			serializedEvent.recycleBuffer();
 		}
 	}
 
@@ -156,7 +156,7 @@ public class EventSerializerTest {
 			final ClassLoader cl = getClass().getClassLoader();
 			return EventSerializer.isEvent(serializedEvent, eventClass, cl);
 		} finally {
-			serializedEvent.recycle();
+			serializedEvent.recycleBuffer();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ded1817..9e281d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -193,7 +193,7 @@ public class RecordWriterTest {
 				@Override
 				public Void answer(InvocationOnMock invocation) throws Throwable {
 					Buffer buffer = (Buffer) invocation.getArguments()[0];
-					buffer.recycle();
+					buffer.recycleBuffer();
 
 					throw new ExpectedTestException();
 				}
@@ -478,7 +478,7 @@ public class RecordWriterTest {
 				} else {
 					// is event:
 					AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
-					buffer.recycle(); // the buffer is not needed anymore
+					buffer.recycleBuffer(); // the buffer is not needed anymore
 					Integer targetChannel = (Integer) invocationOnMock.getArguments()[1];
 					queues[targetChannel].add(new BufferOrEvent(event, targetChannel));
 				}
@@ -530,7 +530,7 @@ public class RecordWriterTest {
 		doAnswer(new Answer<Void>() {
 			@Override
 			public Void answer(InvocationOnMock invocation) throws Throwable {
-				((Buffer) invocation.getArguments()[0]).recycle();
+				((Buffer) invocation.getArguments()[0]).recycleBuffer();
 
 				return null;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index d15aba6..14eb6cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -138,7 +138,7 @@ public class BufferPoolFactoryTest {
 			assertNull(bufferPool2.requestBuffer());
 
 			// as soon as one excess buffer of bufferPool1 is recycled, it should be available for bufferPool2
-			buffers.remove(0).recycle();
+			buffers.remove(0).recycleBuffer();
 			// recycle returns the excess buffer to the network buffer pool
 			assertEquals(1, networkBufferPool.getNumberOfAvailableMemorySegments());
 			// verify the number of buffers taken from the pools
@@ -158,7 +158,7 @@ public class BufferPoolFactoryTest {
 				bufferPool2.bestEffortGetNumOfUsedBuffers() + bufferPool2.getNumberOfAvailableMemorySegments());
 		} finally {
 			for (Buffer buffer : buffers) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 			if (bufferPool1 != null) {
 				bufferPool1.lazyDestroy();

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 4eb568a..b04286e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -108,7 +108,7 @@ public class LocalBufferPoolTest {
 		}
 
 		for (Buffer buffer : requests) {
-			buffer.recycle();
+			buffer.recycleBuffer();
 		}
 	}
 
@@ -142,7 +142,7 @@ public class LocalBufferPoolTest {
 
 		// Recycle should return buffers to memory segment pool
 		for (Buffer buffer : requests) {
-			buffer.recycle();
+			buffer.recycleBuffer();
 		}
 	}
 
@@ -166,12 +166,12 @@ public class LocalBufferPoolTest {
 		assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
 
 		for (int i = 1; i < numBuffers / 2; i++) {
-			requests.remove(0).recycle();
+			requests.remove(0).recycleBuffer();
 			assertEquals(numBuffers - i, getNumRequestedFromMemorySegmentPool());
 		}
 
 		for (Buffer buffer : requests) {
-			buffer.recycle();
+			buffer.recycleBuffer();
 		}
 	}
 
@@ -188,7 +188,7 @@ public class LocalBufferPoolTest {
 
 		// Recycle all
 		for (Buffer buffer : requests) {
-			buffer.recycle();
+			buffer.recycleBuffer();
 		}
 
 		assertEquals(numBuffers, localBufferPool.getNumberOfAvailableMemorySegments());
@@ -227,13 +227,13 @@ public class LocalBufferPoolTest {
 		// Recycle the first buffer to notify both of the above listeners once
 		// and the twoTimesListener will be added into the registeredListeners
 		// queue of buffer pool again
-		available1.recycle();
+		available1.recycleBuffer();
 
 		verify(oneTimeListener, times(1)).notifyBufferAvailable(any(Buffer.class));
 		verify(twoTimesListener, times(1)).notifyBufferAvailable(any(Buffer.class));
 
 		// Recycle the second buffer to only notify the twoTimesListener
-		available2.recycle();
+		available2.recycleBuffer();
 
 		verify(oneTimeListener, times(1)).notifyBufferAvailable(any(Buffer.class));
 		verify(twoTimesListener, times(2)).notifyBufferAvailable(any(Buffer.class));
@@ -255,7 +255,7 @@ public class LocalBufferPoolTest {
 
 		localBufferPool.lazyDestroy();
 
-		available.recycle();
+		available.recycleBuffer();
 
 		verify(listener, times(1)).notifyBufferDestroyed();
 	}
@@ -333,7 +333,7 @@ public class LocalBufferPoolTest {
 		List<Buffer> requestedBuffers = f.get(60, TimeUnit.SECONDS);
 
 		for (Buffer buffer : requestedBuffers) {
-			buffer.recycle();
+			buffer.recycleBuffer();
 		}
 	}
 
@@ -354,7 +354,7 @@ public class LocalBufferPoolTest {
 		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
 		assertNull(localBufferPool.requestBuffer());
 		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
-		buffer1.recycle();
+		buffer1.recycleBuffer();
 		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
 
 		// check max number of buffers:
@@ -366,9 +366,9 @@ public class LocalBufferPoolTest {
 		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
 		assertNull(localBufferPool.requestBuffer());
 		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
-		buffer1.recycle();
+		buffer1.recycleBuffer();
 		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
-		buffer2.recycle();
+		buffer2.recycleBuffer();
 		assertEquals(2, localBufferPool.getNumberOfAvailableMemorySegments());
 
 		// try to set too large buffer size:
@@ -380,9 +380,9 @@ public class LocalBufferPoolTest {
 		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
 		assertNull(localBufferPool.requestBuffer());
 		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
-		buffer1.recycle();
+		buffer1.recycleBuffer();
 		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
-		buffer2.recycle();
+		buffer2.recycleBuffer();
 		assertEquals(2, localBufferPool.getNumberOfAvailableMemorySegments());
 
 		// decrease size again
@@ -391,7 +391,7 @@ public class LocalBufferPoolTest {
 		assertNotNull(buffer1 = localBufferPool.requestBuffer());
 		assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
 		assertNull(localBufferPool.requestBuffer());
-		buffer1.recycle();
+		buffer1.recycleBuffer();
 		assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
 	}
 
@@ -410,7 +410,7 @@ public class LocalBufferPoolTest {
 			@Override
 			public boolean notifyBufferAvailable(Buffer buffer) {
 				times++;
-				buffer.recycle();
+				buffer.recycleBuffer();
 				return times < notificationTimes;
 			}
 
@@ -436,7 +436,7 @@ public class LocalBufferPoolTest {
 			try {
 				for (int i = 0; i < numBuffersToRequest; i++) {
 					Buffer buffer = bufferProvider.requestBufferBlocking();
-					buffer.recycle();
+					buffer.recycleBuffer();
 				}
 			}
 			catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 4d7648a..739a4d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -146,7 +146,7 @@ public class NetworkBufferPoolTest {
 
 			// the recycled buffers should go to the global pool
 			for (Buffer b : buffers) {
-				b.recycle();
+				b.recycleBuffer();
 			}
 			assertEquals(globalPool.getTotalNumberOfMemorySegments(), globalPool.getNumberOfAvailableMemorySegments());
 
@@ -284,7 +284,7 @@ public class NetworkBufferPoolTest {
 				}
 
 				for (Buffer buffer : buffers) {
-					buffer.recycle();
+					buffer.recycleBuffer();
 				}
 			});
 			bufferRecycler.start();

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index ed5fff8..53ecda3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -67,7 +67,7 @@ public class NettyMessageSerializationTest {
 			NettyMessage.BufferResponse actual = encodeAndDecode(expected);
 
 			// Verify recycle has been called on buffer instance
-			verify(buffer, times(1)).recycle();
+			verify(buffer, times(1)).recycleBuffer();
 
 			final ByteBuf retainedSlice = actual.getNettyBuffer();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 9481d57..68deec7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -142,7 +142,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
 			InputGate gate = getEnvironment().getInputGate(0);
 			Buffer buffer = gate.getNextBufferOrEvent().getBuffer();
 			if (buffer != null) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 56ed622..40be8df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -237,7 +237,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
 				numberOfBuffers++;
 
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 
 			@Override
@@ -311,11 +311,11 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		} finally {
 			buffer1Recycled = buffer1.isRecycled();
 			if (!buffer1Recycled) {
-				buffer1.recycle();
+				buffer1.recycleBuffer();
 			}
 			buffer2Recycled = buffer2.isRecycled();
 			if (!buffer2Recycled) {
-				buffer2.recycle();
+				buffer2.recycleBuffer();
 			}
 		}
 		if (!buffer1Recycled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 5f5f459..9c02b65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -128,7 +128,7 @@ public class ResultPartitionTest {
 		} finally {
 			if (!buffer.isRecycled()) {
 				Assert.fail("buffer not recycled");
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 			// should not have notified either
 			verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
@@ -162,7 +162,7 @@ public class ResultPartitionTest {
 		} finally {
 			if (!buffer.isRecycled()) {
 				Assert.fail("buffer not recycled");
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 			// should not have notified either
 			verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
@@ -224,7 +224,7 @@ public class ResultPartitionTest {
 			assertFalse("buffer should not be recycled (still in the queue)", buffer.isRecycled());
 		} finally {
 			if (!buffer.isRecycled()) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 			// should have been notified for pipelined partitions
 			if (pipelined.isPipelined()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 9664945..4a03321 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -194,8 +194,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		SpillableSubpartition partition = createSubpartition();
 
 		Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
-		buffer.retain();
-		buffer.retain();
+		buffer.retainBuffer();
+		buffer.retainBuffer();
 
 		partition.add(buffer);
 		partition.add(buffer);
@@ -243,7 +243,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
 		assertNotSame(buffer, read);
 		assertFalse(read.buffer().isRecycled());
-		read.buffer().recycle();
+		read.buffer().recycleBuffer();
 		assertTrue(read.buffer().isRecycled());
 
 		read = reader.getNextBuffer();
@@ -252,7 +252,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
 		assertNotSame(buffer, read);
 		assertFalse(read.buffer().isRecycled());
-		read.buffer().recycle();
+		read.buffer().recycleBuffer();
 		assertTrue(read.buffer().isRecycled());
 
 		read = reader.getNextBuffer();
@@ -261,7 +261,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
 		assertNotSame(buffer, read);
 		assertFalse(read.buffer().isRecycled());
-		read.buffer().recycle();
+		read.buffer().recycleBuffer();
 		assertTrue(read.buffer().isRecycled());
 
 		// End of partition
@@ -272,7 +272,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(EndOfPartitionEvent.class,
 			EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass());
 		assertFalse(read.buffer().isRecycled());
-		read.buffer().recycle();
+		read.buffer().recycleBuffer();
 		assertTrue(read.buffer().isRecycled());
 
 		// finally check that the buffer has been freed after a successful (or failed) write
@@ -292,8 +292,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		SpillableSubpartition partition = createSubpartition();
 
 		Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
-		buffer.retain();
-		buffer.retain();
+		buffer.retainBuffer();
+		buffer.retainBuffer();
 
 		partition.add(buffer);
 		partition.add(buffer);
@@ -318,7 +318,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertSame(buffer, read.buffer());
 		assertEquals(2, partition.getBuffersInBacklog());
 		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
-		read.buffer().recycle();
+		read.buffer().recycleBuffer();
 		assertEquals(2, listener.getNumNotifiedBuffers());
 		assertFalse(buffer.isRecycled());
 
@@ -338,7 +338,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(1, partition.getBuffersInBacklog());
 		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
 		assertSame(buffer, read.buffer());
-		read.buffer().recycle();
+		read.buffer().recycleBuffer();
 		// now the buffer may be freed, depending on the timing of the write operation
 		// -> let's do this check at the end of the test (to save some time)
 
@@ -348,7 +348,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
 		assertNotSame(buffer, read.buffer());
 		assertFalse(read.buffer().isRecycled());
-		read.buffer().recycle();
+		read.buffer().recycleBuffer();
 		assertTrue(read.buffer().isRecycled());
 
 		// End of partition
@@ -359,7 +359,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(EndOfPartitionEvent.class,
 			EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass());
 		assertFalse(read.buffer().isRecycled());
-		read.buffer().recycle();
+		read.buffer().recycleBuffer();
 		assertTrue(read.buffer().isRecycled());
 
 		// finally check that the buffer has been freed after a successful (or failed) write
@@ -408,7 +408,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 			partition.add(buffer);
 		} finally {
 			if (!buffer.isRecycled()) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 				Assert.fail("buffer not recycled");
 			}
 		}
@@ -454,7 +454,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		} finally {
 			bufferRecycled = buffer.isRecycled();
 			if (!bufferRecycled) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 		}
 		if (!bufferRecycled) {
@@ -483,7 +483,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 			ioManager.shutdown();
 			bufferRecycled = buffer.isRecycled();
 			if (!bufferRecycled) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 		}
 		if (bufferRecycled) {
@@ -546,10 +546,10 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		} finally {
 			ioManager.shutdown();
 			if (!buffer1.isRecycled()) {
-				buffer1.recycle();
+				buffer1.recycleBuffer();
 			}
 			if (!buffer2.isRecycled()) {
-				buffer2.recycle();
+				buffer2.recycleBuffer();
 			}
 		}
 		// note: a view requires a finished partition which has an additional EndOfPartitionEvent
@@ -577,7 +577,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 			ioManager.shutdown();
 			bufferRecycled = buffer.isRecycled();
 			if (!bufferRecycled) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 		}
 		if (!bufferRecycled) {
@@ -666,11 +666,11 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		} finally {
 			buffer1Recycled = buffer1.isRecycled();
 			if (!buffer1Recycled) {
-				buffer1.recycle();
+				buffer1.recycleBuffer();
 			}
 			buffer2Recycled = buffer2.isRecycled();
 			if (!buffer2Recycled) {
-				buffer2.recycle();
+				buffer2.recycleBuffer();
 			}
 		}
 		if (!buffer1Recycled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 16cd90d..2f68418 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -518,7 +518,7 @@ public class LocalInputChannelTest {
 				BufferOrEvent boe;
 				while ((boe = inputGate.getNextBufferOrEvent()) != null) {
 					if (boe.isBuffer()) {
-						boe.getBuffer().recycle();
+						boe.getBuffer().recycleBuffer();
 
 						// Check that we don't receive too many buffers
 						if (++numberOfBuffersPerChannel[boe.getChannelIndex()]

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 469aa97..8c8b96a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -72,7 +72,7 @@ public class RemoteInputChannelTest {
 		final Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
 
 		// The test
-		inputChannel.onBuffer(buffer.retain(), 0, -1);
+		inputChannel.onBuffer(buffer.retainBuffer(), 0, -1);
 
 		// This does not yet throw the exception, but sets the error at the channel.
 		inputChannel.onBuffer(buffer, 29, -1);
@@ -119,7 +119,7 @@ public class RemoteInputChannelTest {
 							for (int j = 0; j < 128; j++) {
 								// this is the same buffer over and over again which will be
 								// recycled by the RemoteInputChannel
-								inputChannel.onBuffer(buffer.retain(), j, -1);
+								inputChannel.onBuffer(buffer.retainBuffer(), j, -1);
 							}
 
 							if (inputChannel.isReleased()) {
@@ -155,7 +155,7 @@ public class RemoteInputChannelTest {
 		finally {
 			executor.shutdown();
 			assertFalse(buffer.isRecycled());
-			buffer.recycle();
+			buffer.recycleBuffer();
 			assertTrue(buffer.isRecycled());
 		}
 	}
@@ -372,7 +372,7 @@ public class RemoteInputChannelTest {
 				0, bufferPool.getNumberOfAvailableMemorySegments());
 
 			// Recycle one floating buffer
-			floatingBufferQueue.poll().recycle();
+			floatingBufferQueue.poll().recycleBuffer();
 
 			// Assign the floating buffer to the listener and the channel is still waiting for more floating buffers
 			verify(bufferPool, times(15)).requestBuffer();
@@ -385,7 +385,7 @@ public class RemoteInputChannelTest {
 				0, bufferPool.getNumberOfAvailableMemorySegments());
 
 			// Recycle one more floating buffer
-			floatingBufferQueue.poll().recycle();
+			floatingBufferQueue.poll().recycleBuffer();
 
 			// Assign the floating buffer to the listener and the channel is still waiting for more floating buffers
 			verify(bufferPool, times(15)).requestBuffer();
@@ -411,7 +411,7 @@ public class RemoteInputChannelTest {
 				0, bufferPool.getNumberOfAvailableMemorySegments());
 
 			// Recycle one exclusive buffer
-			exclusiveBuffer.recycle();
+			exclusiveBuffer.recycleBuffer();
 
 			// The exclusive buffer is returned to the channel directly
 			verify(bufferPool, times(15)).requestBuffer();
@@ -474,7 +474,7 @@ public class RemoteInputChannelTest {
 				0, bufferPool.getNumberOfAvailableMemorySegments());
 
 			// Recycle one floating buffer
-			floatingBuffer.recycle();
+			floatingBuffer.recycleBuffer();
 
 			// The floating buffer is returned to local buffer directly because the channel is not waiting
 			// for floating buffers
@@ -488,7 +488,7 @@ public class RemoteInputChannelTest {
 				1, bufferPool.getNumberOfAvailableMemorySegments());
 
 			// Recycle one exclusive buffer
-			exclusiveBuffer.recycle();
+			exclusiveBuffer.recycleBuffer();
 
 			// Return one extra floating buffer to the local pool because the number of available buffers
 			// already equals to required buffers
@@ -566,7 +566,7 @@ public class RemoteInputChannelTest {
 				0, bufferPool.getNumberOfAvailableMemorySegments());
 
 			// Recycle one exclusive buffer
-			exclusiveBuffer.recycle();
+			exclusiveBuffer.recycleBuffer();
 
 			// Return one extra floating buffer to the local pool because the number of available buffers
 			// is more than required buffers
@@ -580,7 +580,7 @@ public class RemoteInputChannelTest {
 				1, bufferPool.getNumberOfAvailableMemorySegments());
 
 			// Recycle one floating buffer
-			floatingBuffer.recycle();
+			floatingBuffer.recycleBuffer();
 
 			// The floating buffer is returned to local pool directly because the channel is not waiting for
 			// floating buffers
@@ -654,7 +654,7 @@ public class RemoteInputChannelTest {
 
 			// Recycle three floating buffers to trigger notify buffer available
 			for (Buffer buffer : floatingBuffers) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 
 			verify(channel1, times(1)).notifyBufferAvailable(any(Buffer.class));
@@ -920,7 +920,7 @@ public class RemoteInputChannelTest {
 			@Override
 			public Void call() throws Exception {
 				for (Buffer buffer : exclusiveBuffers) {
-					buffer.recycle();
+					buffer.recycleBuffer();
 				}
 
 				return null;
@@ -948,7 +948,7 @@ public class RemoteInputChannelTest {
 			@Override
 			public Void call() throws Exception {
 				for (Buffer buffer : floatingBuffers) {
-					buffer.recycle();
+					buffer.recycleBuffer();
 				}
 
 				return null;

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
index 0d66f13..f07c5d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
@@ -69,7 +69,7 @@ public interface TestConsumerCallback {
 		public void onBuffer(Buffer buffer) {
 			super.onBuffer(buffer);
 
-			buffer.recycle();
+			buffer.recycleBuffer();
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
index 37137ff..b4bdd3e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
@@ -109,7 +109,7 @@ public class TestSubpartitionConsumer implements Callable<Boolean>, BufferAvaila
 
 						callback.onEvent(event);
 
-						bufferAndBacklog.buffer().recycle();
+						bufferAndBacklog.buffer().recycleBuffer();
 
 						if (event.getClass() == EndOfPartitionEvent.class) {
 							subpartitionView.notifySubpartitionConsumed();

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
index 7c9e80b..b1a4cd4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
@@ -219,7 +219,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 							// 2) Release all buffers and let the tasks grab one
 							//
 							for (Buffer buf : buffers) {
-								buf.recycle();
+								buf.recycleBuffer();
 								Assert.assertTrue(buf.isRecycled());
 							}
 
@@ -322,7 +322,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
 			while (true) {
 				Buffer buffer = testBufferPool.requestBufferBlocking();
 				// Got a buffer, yay!
-				buffer.recycle();
+				buffer.recycleBuffer();
 
 				new CountDownLatch(1).await();
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 7f6e875..33aac7e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -151,7 +151,7 @@ public class BufferSpiller {
 		}
 		finally {
 			if (boe.isBuffer()) {
-				boe.getBuffer().recycle();
+				boe.getBuffer().recycleBuffer();
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index cf8f452..9f526a5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -188,7 +188,7 @@ public class StreamInputProcessor<IN> {
 				DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
 
 				if (result.isBufferConsumed()) {
-					currentRecordDeserializer.getCurrentBuffer().recycle();
+					currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
 					currentRecordDeserializer = null;
 				}
 
@@ -273,7 +273,7 @@ public class StreamInputProcessor<IN> {
 		for (RecordDeserializer<?> deserializer : recordDeserializers) {
 			Buffer buffer = deserializer.getCurrentBuffer();
 			if (buffer != null && !buffer.isRecycled()) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 			deserializer.clear();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index b99ad53..a3d9236 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -225,7 +225,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 				}
 
 				if (result.isBufferConsumed()) {
-					currentRecordDeserializer.getCurrentBuffer().recycle();
+					currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
 					currentRecordDeserializer = null;
 				}
 
@@ -338,7 +338,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 		for (RecordDeserializer<?> deserializer : recordDeserializers) {
 			Buffer buffer = deserializer.getCurrentBuffer();
 			if (buffer != null && !buffer.isRecycled()) {
-				buffer.recycle();
+				buffer.recycleBuffer();
 			}
 			deserializer.clear();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index cca6fb3..9a59737 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -282,7 +282,7 @@ public class BarrierBufferAlignmentLimitTest {
 		buf.setSize(size);
 
 		// retain an additional time so it does not get disposed after being read by the input gate
-		buf.retain();
+		buf.retainBuffer();
 
 		return new BufferOrEvent(buf, channel);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 090e44a..146be75 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -67,7 +67,7 @@ public class BarrierBufferMassiveRandomTest {
 			for (int i = 0; i < 2000000; i++) {
 				BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
 				if (boe.isBuffer()) {
-					boe.getBuffer().recycle();
+					boe.getBuffer().recycleBuffer();
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index c7c9df2..4a03445 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -1411,7 +1411,7 @@ public class BarrierBufferTest {
 		buf.setSize(size);
 
 		// retain an additional time so it does not get disposed after being read by the input gate
-		buf.retain();
+		buf.retainBuffer();
 
 		return new BufferOrEvent(buf, channel);
 	}


[2/2] flink git commit: [FLINK-8395][network] add a read-only sliced ByteBuf implementation based on NetworkBuffer

Posted by sr...@apache.org.
[FLINK-8395][network] add a read-only sliced ByteBuf implementation based on NetworkBuffer

This closes #5288.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/665347cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/665347cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/665347cf

Branch: refs/heads/master
Commit: 665347cf867f0b807fe9d4787b5ef42b0581b510
Parents: db440f2
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jan 9 22:57:18 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Jan 18 15:35:25 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/io/network/buffer/Buffer.java |  24 ++
 .../io/network/buffer/NetworkBuffer.java        |  10 +
 .../buffer/ReadOnlySlicedNetworkBuffer.java     | 199 +++++++++++
 .../runtime/io/network/buffer/BufferTest.java   |  78 ++++-
 .../buffer/ReadOnlySlicedBufferTest.java        | 326 +++++++++++++++++++
 5 files changed, 629 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/665347cf/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index f1eeb69..3c9933e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -97,6 +97,30 @@ public interface Buffer {
 	Buffer retainBuffer();
 
 	/**
+	 * Returns a read-only slice of this buffer's readable bytes, i.e. between
+	 * {@link #getReaderIndex()} and {@link #getSize()}.
+	 *
+	 * <p>Reader and writer indices as well as markers are not shared. Reference counters are
+	 * shared but the slice is not {@link #retainBuffer() retained} automatically.
+	 *
+	 * @return a read-only sliced buffer
+	 */
+	Buffer readOnlySlice();
+
+	/**
+	 * Returns a read-only slice of this buffer.
+	 *
+	 * <p>Reader and writer indices as well as markers are not shared. Reference counters are
+	 * shared but the slice is not {@link #retainBuffer() retained} automatically.
+	 *
+	 * @param index the index to start from
+	 * @param length the length of the slice
+	 *
+	 * @return a read-only sliced buffer
+	 */
+	Buffer readOnlySlice(int index, int length);
+
+	/**
 	 * Returns the maximum size of the buffer, i.e. the capacity of the underlying {@link MemorySegment}.
 	 *
 	 * @return size of the buffer

http://git-wip-us.apache.org/repos/asf/flink/blob/665347cf/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
index 28c5699..4ca50db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -152,6 +152,16 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
 	}
 
 	@Override
+	public ReadOnlySlicedNetworkBuffer readOnlySlice() {
+		return readOnlySlice(readerIndex(), readableBytes());
+	}
+
+	@Override
+	public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length) {
+		return new ReadOnlySlicedNetworkBuffer(this, index, length);
+	}
+
+	@Override
 	protected void deallocate() {
 		recycler.recycle(memorySegment);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/665347cf/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
new file mode 100644
index 0000000..f268314
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ReadOnlyByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.SlicedByteBuf;
+
+import java.nio.ByteBuffer;
+import java.nio.ReadOnlyBufferException;
+
+/**
+ * Minimal best-effort read-only sliced {@link Buffer} implementation wrapping a
+ * {@link NetworkBuffer}'s sub-region based on <tt>io.netty.buffer.SlicedByteBuf</tt> and
+ * <tt>io.netty.buffer.ReadOnlyByteBuf</tt>.
+ *
+ * <p><strong>BEWARE:</strong> We do not guarantee to block every operation that is able to write
+ * data but all returned data structures should be handled as if it was!.
+ */
+public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implements Buffer {
+
+	/**
+	 * Creates a buffer which shares the memory segment of the given buffer and exposed the given
+	 * sub-region only.
+	 *
+	 * <p>Reader and writer indices as well as markers are not shared. Reference counters are
+	 * shared but the slice is not {@link #retainBuffer() retained} automatically.
+	 *
+	 * @param buffer the buffer to derive from
+	 * @param index the index to start from
+	 * @param length the length of the slice
+	 */
+	ReadOnlySlicedNetworkBuffer(NetworkBuffer buffer, int index, int length) {
+		super(new SlicedByteBuf(buffer, index, length));
+	}
+
+	/**
+	 * Creates a buffer which shares the memory segment of the given buffer and exposed the given
+	 * sub-region only.
+	 *
+	 * <p>Reader and writer indices as well as markers are not shared. Reference counters are
+	 * shared but the slice is not {@link #retainBuffer() retained} automatically.
+	 *
+	 * @param buffer the buffer to derive from
+	 * @param index the index to start from
+	 * @param length the length of the slice
+	 */
+	private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int index, int length) {
+		super(new SlicedByteBuf(buffer, index, length));
+	}
+
+	@Override
+	public ByteBuf unwrap() {
+		return super.unwrap().unwrap();
+	}
+
+	@Override
+	public boolean isBuffer() {
+		return ((Buffer) unwrap()).isBuffer();
+	}
+
+	@Override
+	public void tagAsEvent() {
+		throw new ReadOnlyBufferException();
+	}
+
+	/**
+	 * Returns the underlying memory segment.
+	 *
+	 * <p><strong>BEWARE:</strong> Although we cannot set the memory segment read-only it should be
+	 * handled as if it was!.
+	 *
+	 * @return the memory segment backing this buffer
+	 */
+	@Override
+	public MemorySegment getMemorySegment() {
+		return ((Buffer) unwrap()).getMemorySegment();
+	}
+
+	@Override
+	public BufferRecycler getRecycler() {
+		return ((Buffer) unwrap()).getRecycler();
+	}
+
+	@Override
+	public void recycleBuffer() {
+		((Buffer) unwrap()).recycleBuffer();
+	}
+
+	@Override
+	public boolean isRecycled() {
+		return ((Buffer) unwrap()).isRecycled();
+	}
+
+	@Override
+	public ReadOnlySlicedNetworkBuffer retainBuffer() {
+		((Buffer) unwrap()).retainBuffer();
+		return this;
+	}
+
+	@Override
+	public ReadOnlySlicedNetworkBuffer readOnlySlice() {
+		return readOnlySlice(readerIndex(), readableBytes());
+	}
+
+	@Override
+	public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length) {
+		return new ReadOnlySlicedNetworkBuffer(super.unwrap(), index, length);
+	}
+
+	@Override
+	public int getMaxCapacity() {
+		return maxCapacity();
+	}
+
+	@Override
+	public int getReaderIndex() {
+		return readerIndex();
+	}
+
+	@Override
+	public void setReaderIndex(int readerIndex) throws IndexOutOfBoundsException {
+		readerIndex(readerIndex);
+	}
+
+	@Override
+	public int getSizeUnsafe() {
+		return writerIndex();
+	}
+
+	@Override
+	public int getSize() {
+		return writerIndex();
+	}
+
+	@Override
+	public void setSize(int writerIndex) {
+		writerIndex(writerIndex);
+	}
+
+	@Override
+	public ByteBuffer getNioBufferReadable() {
+		return nioBuffer();
+	}
+
+	@Override
+	public ByteBuffer getNioBuffer(int index, int length) throws IndexOutOfBoundsException {
+		return nioBuffer(index, length);
+	}
+
+	@Override
+	public ByteBuffer nioBuffer(int index, int length) {
+		return super.nioBuffer(index, length).asReadOnlyBuffer();
+	}
+
+	@Override
+	public boolean isWritable() {
+		return false;
+	}
+
+	@Override
+	public boolean isWritable(int numBytes) {
+		return false;
+	}
+
+	@Override
+	public ByteBuf ensureWritable(int minWritableBytes) {
+		// note: ReadOnlyByteBuf allows this but in most cases this does not make sense
+		if (minWritableBytes != 0) {
+			throw new ReadOnlyBufferException();
+		}
+		return this;
+	}
+
+	@Override
+	public void setAllocator(ByteBufAllocator allocator) {
+		((Buffer) unwrap()).setAllocator(allocator);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/665347cf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
index f59fa9f..90516e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
@@ -166,13 +166,13 @@ public class BufferTest extends AbstractByteBufTest {
 	}
 
 	/**
-	 * Tests that {@link NetworkBuffer#recycle()} and {@link NetworkBuffer#isRecycled()} are coupled
-	 * and are also consistent with {@link NetworkBuffer#refCnt()}.
+	 * Tests that {@link NetworkBuffer#recycleBuffer()} and {@link NetworkBuffer#isRecycled()} are
+	 * coupled and are also consistent with {@link NetworkBuffer#refCnt()}.
 	 */
 	private static void testRecycleBuffer(boolean isBuffer) {
 		NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
 		assertFalse(buffer.isRecycled());
-		buffer.recycle();
+		buffer.recycleBuffer();
 		assertTrue(buffer.isRecycled());
 		assertEquals(0, buffer.refCnt());
 	}
@@ -188,18 +188,74 @@ public class BufferTest extends AbstractByteBufTest {
 	}
 
 	/**
-	 * Tests that {@link NetworkBuffer#retain()} and {@link NetworkBuffer#isRecycled()} are coupled
-	 * and are also consistent with {@link NetworkBuffer#refCnt()}.
+	 * Tests that {@link NetworkBuffer#retainBuffer()} and {@link NetworkBuffer#isRecycled()} are
+	 * coupled and are also consistent with {@link NetworkBuffer#refCnt()}.
 	 */
 	private static void testRetainBuffer(boolean isBuffer) {
 		NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
 		assertFalse(buffer.isRecycled());
-		buffer.retain();
+		buffer.retainBuffer();
 		assertFalse(buffer.isRecycled());
 		assertEquals(2, buffer.refCnt());
 	}
 
 	@Test
+	public void testDataBufferCreateSlice1() {
+		testCreateSlice1(true);
+	}
+
+	@Test
+	public void testEventBufferCreateSlice1() {
+		testCreateSlice1(false);
+	}
+
+	private static void testCreateSlice1(boolean isBuffer) {
+		NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+		buffer.setSize(10); // fake some data
+		ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice();
+
+		assertEquals(0, slice.getReaderIndex());
+		assertEquals(10, slice.getSize());
+		assertEquals(10, slice.getSizeUnsafe());
+		assertSame(buffer, slice.unwrap());
+
+		// slice indices should be independent:
+		buffer.setSize(8);
+		buffer.setReaderIndex(2);
+		assertEquals(0, slice.getReaderIndex());
+		assertEquals(10, slice.getSize());
+		assertEquals(10, slice.getSizeUnsafe());
+	}
+
+	@Test
+	public void testDataBufferCreateSlice2() {
+		testCreateSlice2(true);
+	}
+
+	@Test
+	public void testEventBufferCreateSlice2() {
+		testCreateSlice2(false);
+	}
+
+	private static void testCreateSlice2(boolean isBuffer) {
+		NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+		buffer.setSize(2); // fake some data
+		ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice(1, 10);
+
+		assertEquals(0, slice.getReaderIndex());
+		assertEquals(10, slice.getSize());
+		assertEquals(10, slice.getSizeUnsafe());
+		assertSame(buffer, slice.unwrap());
+
+		// slice indices should be independent:
+		buffer.setSize(8);
+		buffer.setReaderIndex(2);
+		assertEquals(0, slice.getReaderIndex());
+		assertEquals(10, slice.getSize());
+		assertEquals(10, slice.getSizeUnsafe());
+	}
+
+	@Test
 	public void testDataBufferGetMaxCapacity() {
 		testGetMaxCapacity(true);
 	}
@@ -330,7 +386,10 @@ public class BufferTest extends AbstractByteBufTest {
 	@Test
 	public void testGetNioBufferReadableThreadSafe() {
 		NetworkBuffer buffer = newBuffer(1024, 1024);
+		testGetNioBufferReadableThreadSafe(buffer);
+	}
 
+	static void testGetNioBufferReadableThreadSafe(Buffer buffer) {
 		ByteBuffer buf1 = buffer.getNioBufferReadable();
 		ByteBuffer buf2 = buffer.getNioBufferReadable();
 
@@ -381,9 +440,12 @@ public class BufferTest extends AbstractByteBufTest {
 	@Test
 	public void testGetNioBufferThreadSafe() {
 		NetworkBuffer buffer = newBuffer(1024, 1024);
+		testGetNioBufferThreadSafe(buffer, 10);
+	}
 
-		ByteBuffer buf1 = buffer.getNioBuffer(0, 10);
-		ByteBuffer buf2 = buffer.getNioBuffer(0, 10);
+	static void testGetNioBufferThreadSafe(Buffer buffer, int length) {
+		ByteBuffer buf1 = buffer.getNioBuffer(0, length);
+		ByteBuffer buf2 = buffer.getNioBuffer(0, length);
 
 		assertNotNull(buf1);
 		assertNotNull(buf2);

http://git-wip-us.apache.org/repos/asf/flink/blob/665347cf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
new file mode 100644
index 0000000..c190bcf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ReadOnlyBufferException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link ReadOnlySlicedNetworkBuffer}.
+ */
+public class ReadOnlySlicedBufferTest {
+	private static final int BUFFER_SIZE = 1024;
+	private static final int DATA_SIZE = 10;
+
+	private NetworkBuffer buffer;
+
+	@Before
+	public void setUp() throws Exception {
+		final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE);
+		buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, true, DATA_SIZE);
+		buffer.setSize(DATA_SIZE);
+	}
+
+	@Test
+	public void testForwardsIsBuffer() throws IOException {
+		assertEquals(buffer.isBuffer(), buffer.readOnlySlice().isBuffer());
+		assertEquals(buffer.isBuffer(), buffer.readOnlySlice(1, 2).isBuffer());
+		NetworkBuffer eventBuffer = (NetworkBuffer) EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+		assertEquals(eventBuffer.isBuffer(), eventBuffer.readOnlySlice().isBuffer());
+		assertEquals(eventBuffer.isBuffer(), eventBuffer.readOnlySlice(1, 2).isBuffer());
+	}
+
+	@Test(expected = ReadOnlyBufferException.class)
+	public void testTagAsEventThrows1() {
+		buffer.readOnlySlice().tagAsEvent();
+	}
+
+	@Test(expected = ReadOnlyBufferException.class)
+	public void testTagAsEventThrows2() {
+		buffer.readOnlySlice(1, 2).tagAsEvent();
+	}
+
+	@Test
+	public void testForwardsGetMemorySegment() {
+		assertSame(buffer.getMemorySegment(), buffer.readOnlySlice().getMemorySegment());
+		assertSame(buffer.getMemorySegment(), buffer.readOnlySlice(1, 2).getMemorySegment());
+	}
+
+	@Test
+	public void testForwardsGetRecycler() {
+		assertSame(buffer.getRecycler(), buffer.readOnlySlice().getRecycler());
+		assertSame(buffer.getRecycler(), buffer.readOnlySlice(1, 2).getRecycler());
+	}
+
+	/**
+	 * Tests forwarding of both {@link ReadOnlySlicedNetworkBuffer#recycleBuffer()} and
+	 * {@link ReadOnlySlicedNetworkBuffer#isRecycled()}.
+	 */
+	@Test
+	public void testForwardsRecycleBuffer1() {
+		ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice();
+		assertFalse(slice.isRecycled());
+		slice.recycleBuffer();
+		assertTrue(slice.isRecycled());
+		assertTrue(buffer.isRecycled());
+	}
+
+	/**
+	 * Tests forwarding of both {@link ReadOnlySlicedNetworkBuffer#recycleBuffer()} and
+	 * {@link ReadOnlySlicedNetworkBuffer#isRecycled()}.
+	 */
+	@Test
+	public void testForwardsRecycleBuffer2() {
+		ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice(1, 2);
+		assertFalse(slice.isRecycled());
+		slice.recycleBuffer();
+		assertTrue(slice.isRecycled());
+		assertTrue(buffer.isRecycled());
+	}
+
+	/**
+	 * Tests forwarding of both {@link ReadOnlySlicedNetworkBuffer#recycleBuffer()} and
+	 * {@link ReadOnlySlicedNetworkBuffer#isRecycled()}.
+	 */
+	@Test
+	public void testForwardsRetainBuffer1() {
+		ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice();
+		assertEquals(buffer.refCnt(), slice.refCnt());
+		slice.retainBuffer();
+		assertEquals(buffer.refCnt(), slice.refCnt());
+	}
+
+	/**
+	 * Tests forwarding of both {@link ReadOnlySlicedNetworkBuffer#retainBuffer()} and
+	 * {@link ReadOnlySlicedNetworkBuffer#isRecycled()}.
+	 */
+	@Test
+	public void testForwardsRetainBuffer2() {
+		ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice(1, 2);
+		assertEquals(buffer.refCnt(), slice.refCnt());
+		slice.retainBuffer();
+		assertEquals(buffer.refCnt(), slice.refCnt());
+	}
+
+	@Test
+	public void testCreateSlice1() {
+		ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice();
+		ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice();
+		ByteBuf unwrap = slice2.unwrap();
+		assertSame(buffer, unwrap);
+	}
+
+	@Test
+	public void testCreateSlice2() {
+		ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice();
+		ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice(1, 2);
+		ByteBuf unwrap = slice2.unwrap();
+		assertSame(buffer, unwrap);
+	}
+
+	@Test
+	public void testCreateSlice3() {
+		ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice(1, 2);
+		ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice();
+		ByteBuf unwrap = slice2.unwrap();
+		assertSame(buffer, unwrap);
+	}
+
+	@Test
+	public void testCreateSlice4() {
+		ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice(1, 5);
+		ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice(1, 2);
+		ByteBuf unwrap = slice2.unwrap();
+		assertSame(buffer, unwrap);
+	}
+
+	@Test
+	public void testGetMaxCapacity() {
+		assertEquals(DATA_SIZE, buffer.readOnlySlice().getMaxCapacity());
+		assertEquals(2, buffer.readOnlySlice(1, 2).getMaxCapacity());
+	}
+
+	/**
+	 * Tests the independence of the reader index via
+	 * {@link ReadOnlySlicedNetworkBuffer#setReaderIndex(int)} and
+	 * {@link ReadOnlySlicedNetworkBuffer#getReaderIndex()}.
+	 */
+	@Test
+	public void testGetSetReaderIndex1() {
+		testGetSetReaderIndex(buffer.readOnlySlice());
+	}
+
+	/**
+	 * Tests the independence of the reader index via
+	 * {@link ReadOnlySlicedNetworkBuffer#setReaderIndex(int)} and
+	 * {@link ReadOnlySlicedNetworkBuffer#getReaderIndex()}.
+	 */
+	@Test
+	public void testGetSetReaderIndex2() {
+		testGetSetReaderIndex(buffer.readOnlySlice(1, 2));
+	}
+
+	private void testGetSetReaderIndex(ReadOnlySlicedNetworkBuffer slice) {
+		assertEquals(0, buffer.getReaderIndex());
+		assertEquals(0, slice.getReaderIndex());
+		slice.setReaderIndex(1);
+		assertEquals(0, buffer.getReaderIndex());
+		assertEquals(1, slice.getReaderIndex());
+	}
+
+	/**
+	 * Tests the independence of the writer index via
+	 * {@link ReadOnlySlicedNetworkBuffer#setSize(int)},
+	 * {@link ReadOnlySlicedNetworkBuffer#getSize()}, and
+	 * {@link ReadOnlySlicedNetworkBuffer#getSizeUnsafe()}.
+	 */
+	@Test
+	public void testGetSetSize1() {
+		testGetSetSize(buffer.readOnlySlice(), DATA_SIZE);
+	}
+
+	/**
+	 * Tests the independence of the writer index via
+	 * {@link ReadOnlySlicedNetworkBuffer#setSize(int)},
+	 * {@link ReadOnlySlicedNetworkBuffer#getSize()}, and
+	 * {@link ReadOnlySlicedNetworkBuffer#getSizeUnsafe()}.
+	 */
+	@Test
+	public void testGetSetSize2() {
+		testGetSetSize(buffer.readOnlySlice(1, 2), 2);
+	}
+
+	private void testGetSetSize(ReadOnlySlicedNetworkBuffer slice, int sliceSize) {
+		assertEquals(DATA_SIZE, buffer.getSize());
+		assertEquals(DATA_SIZE, buffer.getSizeUnsafe());
+		assertEquals(sliceSize, slice.getSize());
+		assertEquals(sliceSize, slice.getSizeUnsafe());
+		buffer.setSize(DATA_SIZE + 1);
+		assertEquals(DATA_SIZE + 1, buffer.getSize());
+		assertEquals(DATA_SIZE + 1, buffer.getSizeUnsafe());
+		assertEquals(sliceSize, slice.getSize());
+		assertEquals(sliceSize, slice.getSizeUnsafe());
+	}
+
+	@Test
+	public void testReadableBytes() {
+		assertEquals(buffer.readableBytes(), buffer.readOnlySlice().readableBytes());
+		assertEquals(2, buffer.readOnlySlice(1, 2).readableBytes());
+	}
+
+	@Test
+	public void testGetNioBufferReadable1() {
+		testGetNioBufferReadable(buffer.readOnlySlice(), DATA_SIZE);
+	}
+
+	@Test
+	public void testGetNioBufferReadable2() {
+		testGetNioBufferReadable(buffer.readOnlySlice(1, 2), 2);
+	}
+
+	private void testGetNioBufferReadable(ReadOnlySlicedNetworkBuffer slice, int sliceSize) {
+		ByteBuffer sliceByteBuffer = slice.getNioBufferReadable();
+		assertTrue(sliceByteBuffer.isReadOnly());
+		assertEquals(sliceSize, sliceByteBuffer.remaining());
+		assertEquals(sliceSize, sliceByteBuffer.limit());
+		assertEquals(sliceSize, sliceByteBuffer.capacity());
+
+		// modify sliceByteBuffer position and verify nothing has changed in the original buffer
+		sliceByteBuffer.position(1);
+		assertEquals(0, buffer.getReaderIndex());
+		assertEquals(0, slice.getReaderIndex());
+		assertEquals(DATA_SIZE, buffer.getSize());
+		assertEquals(sliceSize, slice.getSize());
+	}
+
+	@Test
+	public void testGetNioBuffer1() {
+		testGetNioBuffer(buffer.readOnlySlice(), DATA_SIZE);
+	}
+
+	@Test
+	public void testGetNioBuffer2() {
+		testGetNioBuffer(buffer.readOnlySlice(1, 2), 2);
+	}
+
+	private void testGetNioBuffer(ReadOnlySlicedNetworkBuffer slice, int sliceSize) {
+		ByteBuffer sliceByteBuffer = slice.getNioBuffer(1, 1);
+		assertTrue(sliceByteBuffer.isReadOnly());
+		assertEquals(1, sliceByteBuffer.remaining());
+		assertEquals(1, sliceByteBuffer.limit());
+		assertEquals(1, sliceByteBuffer.capacity());
+
+		// modify sliceByteBuffer position and verify nothing has changed in the original buffer
+		sliceByteBuffer.position(1);
+		assertEquals(0, buffer.getReaderIndex());
+		assertEquals(0, slice.getReaderIndex());
+		assertEquals(DATA_SIZE, buffer.getSize());
+		assertEquals(sliceSize, slice.getSize());
+	}
+
+	@Test
+	public void testGetNioBufferReadableThreadSafe1() {
+		BufferTest.testGetNioBufferReadableThreadSafe(buffer.readOnlySlice());
+	}
+
+	@Test
+	public void testGetNioBufferReadableThreadSafe2() {
+		BufferTest.testGetNioBufferReadableThreadSafe(buffer.readOnlySlice(1, 2));
+	}
+
+	@Test
+	public void testGetNioBufferThreadSafe1() {
+		BufferTest.testGetNioBufferThreadSafe(buffer.readOnlySlice(), DATA_SIZE);
+	}
+
+	@Test
+	public void testGetNioBufferThreadSafe2() {
+		BufferTest.testGetNioBufferThreadSafe(buffer.readOnlySlice(1, 2), 2);
+	}
+
+	@Test
+	public void testForwardsSetAllocator() {
+		testForwardsSetAllocator(buffer.readOnlySlice());
+		testForwardsSetAllocator(buffer.readOnlySlice(1, 2));
+	}
+
+	private void testForwardsSetAllocator(ReadOnlySlicedNetworkBuffer slice) {
+		NettyBufferPool allocator = new NettyBufferPool(1);
+		slice.setAllocator(allocator);
+		assertSame(buffer.alloc(), slice.alloc());
+		assertSame(allocator, slice.alloc());
+	}
+}