You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/18 14:36:17 UTC
[1/2] flink git commit: [hotfix][network] rename Buffer#retain() and
#recycle in preparation for FLINK-8396 and FLINK-8395
Repository: flink
Updated Branches:
refs/heads/master 9d0dfcba6 -> 665347cf8
[hotfix][network] rename Buffer#retain() and #recycle in preparation for FLINK-8396 and FLINK-8395
Since these two methods also exist in Netty's ByteBuf, we would otherwise get
into overloading conflicts.
Also add Buffer#readableBytes() and Buffer#setAllocator().
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/db440f24
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/db440f24
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/db440f24
Branch: refs/heads/master
Commit: db440f2434423a23207ba666b33f4ccb55adede5
Parents: 9d0dfcb
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jan 9 22:56:38 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Jan 18 15:31:18 2018 +0100
----------------------------------------------------------------------
.../iomanager/AsynchronousBufferFileWriter.java | 6 +--
.../iomanager/SynchronousBufferFileReader.java | 2 +-
.../api/reader/AbstractRecordReader.java | 4 +-
.../io/network/api/writer/RecordWriter.java | 6 +--
.../api/writer/ResultPartitionWriter.java | 4 +-
.../flink/runtime/io/network/buffer/Buffer.java | 23 +++++++++--
.../io/network/buffer/NetworkBuffer.java | 10 ++---
.../runtime/io/network/netty/NettyMessage.java | 2 +-
.../netty/PartitionRequestClientHandler.java | 4 +-
.../io/network/netty/PartitionRequestQueue.java | 2 +-
.../partition/PipelinedSubpartition.java | 4 +-
.../io/network/partition/ResultPartition.java | 4 +-
.../partition/ResultSubpartitionView.java | 2 +-
.../partition/SpillableSubpartition.java | 8 ++--
.../partition/SpillableSubpartitionView.java | 2 +-
.../partition/consumer/RemoteInputChannel.java | 12 +++---
.../AsynchronousBufferFileWriterTest.java | 2 +-
.../api/serialization/EventSerializerTest.java | 4 +-
.../io/network/api/writer/RecordWriterTest.java | 6 +--
.../network/buffer/BufferPoolFactoryTest.java | 4 +-
.../io/network/buffer/LocalBufferPoolTest.java | 34 ++++++++---------
.../network/buffer/NetworkBufferPoolTest.java | 4 +-
.../netty/NettyMessageSerializationTest.java | 2 +-
.../PartialConsumePipelinedResultTest.java | 2 +-
.../partition/PipelinedSubpartitionTest.java | 6 +--
.../network/partition/ResultPartitionTest.java | 6 +--
.../partition/SpillableSubpartitionTest.java | 40 ++++++++++----------
.../consumer/LocalInputChannelTest.java | 2 +-
.../consumer/RemoteInputChannelTest.java | 26 ++++++-------
.../io/network/util/TestConsumerCallback.java | 2 +-
.../network/util/TestSubpartitionConsumer.java | 2 +-
.../BackPressureStatsTrackerITCase.java | 4 +-
.../streaming/runtime/io/BufferSpiller.java | 2 +-
.../runtime/io/StreamInputProcessor.java | 4 +-
.../runtime/io/StreamTwoInputProcessor.java | 4 +-
.../io/BarrierBufferAlignmentLimitTest.java | 2 +-
.../io/BarrierBufferMassiveRandomTest.java | 2 +-
.../streaming/runtime/io/BarrierBufferTest.java | 2 +-
38 files changed, 134 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
index 9a78d0a..0c58f16 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriter.java
@@ -48,7 +48,7 @@ public class AsynchronousBufferFileWriter extends AsynchronousFileIOChannel<Buff
addRequest(new BufferWriteRequest(this, buffer));
} catch (Throwable e) {
// if not added, we need to recycle here
- buffer.recycle();
+ buffer.recycleBuffer();
ExceptionUtils.rethrowIOException(e);
}
@@ -71,12 +71,12 @@ public class AsynchronousBufferFileWriter extends AsynchronousFileIOChannel<Buff
@Override
public void requestSuccessful(Buffer buffer) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
@Override
public void requestFailed(Buffer buffer, IOException e) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
index 495336c..a30961f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/SynchronousBufferFileReader.java
@@ -47,7 +47,7 @@ public class SynchronousBufferFileReader extends SynchronousFileIOChannel implem
hasReachedEndOfFile = reader.readBufferFromFileChannel(buffer);
}
else {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
index 29f2b6d..1ac5f75 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -74,7 +74,7 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
if (result.isBufferConsumed()) {
final Buffer currentBuffer = currentRecordDeserializer.getCurrentBuffer();
- currentBuffer.recycle();
+ currentBuffer.recycleBuffer();
currentRecordDeserializer = null;
}
@@ -118,7 +118,7 @@ abstract class AbstractRecordReader<T extends IOReadableWritable> extends Abstra
for (RecordDeserializer<?> deserializer : recordDeserializers) {
Buffer buffer = deserializer.getCurrentBuffer();
if (buffer != null && !buffer.isRecycled()) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
deserializer.clear();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 39dbacc..de1cac6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -155,14 +155,14 @@ public class RecordWriter<T extends IOReadableWritable> {
}
// retain the buffer so that it can be recycled by each channel of targetPartition
- eventBuffer.retain();
+ eventBuffer.retainBuffer();
targetPartition.writeBuffer(eventBuffer, targetChannel);
}
}
} finally {
// we do not need to further retain the eventBuffer
// (it will be recycled after the last channel stops using it)
- eventBuffer.recycle();
+ eventBuffer.recycleBuffer();
}
}
@@ -192,7 +192,7 @@ public class RecordWriter<T extends IOReadableWritable> {
Buffer buffer = serializer.getCurrentBuffer();
if (buffer != null) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
finally {
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 3a66e53..454a9ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -57,13 +57,13 @@ public interface ResultPartitionWriter {
try {
for (int subpartition = 0; subpartition < getNumberOfSubpartitions(); subpartition++) {
// retain the buffer so that it can be recycled by each channel of targetPartition
- buffer.retain();
+ buffer.retainBuffer();
writeBuffer(buffer, subpartition);
}
} finally {
// we do not need to further retain the eventBuffer
// (it will be recycled after the last channel stops using it)
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index 12acfdb..f1eeb69 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+
import java.nio.ByteBuffer;
/**
@@ -74,9 +76,9 @@ public interface Buffer {
* Releases this buffer once, i.e. reduces the reference count and recycles the buffer if the
* reference count reaches <tt>0</tt>.
*
- * @see #retain()
+ * @see #retainBuffer()
*/
- void recycle();
+ void recycleBuffer();
/**
* Returns whether this buffer has been recycled or not.
@@ -90,9 +92,9 @@ public interface Buffer {
*
* @return <tt>this</tt> instance (for chained calls)
*
- * @see #recycle()
+ * @see #recycleBuffer()
*/
- Buffer retain();
+ Buffer retainBuffer();
/**
* Returns the maximum size of the buffer, i.e. the capacity of the underlying {@link MemorySegment}.
@@ -149,6 +151,12 @@ public interface Buffer {
void setSize(int writerIndex);
/**
+ * Returns the number of readable bytes (same as <tt>{@link #getSize()} -
+ * {@link #getReaderIndex()}</tt>).
+ */
+ int readableBytes();
+
+ /**
* Gets a new {@link ByteBuffer} instance wrapping this buffer's readable bytes, i.e. between
* {@link #getReaderIndex()} and {@link #getSize()}.
*
@@ -171,4 +179,11 @@ public interface Buffer {
* @see #getNioBufferReadable()
*/
ByteBuffer getNioBuffer(int index, int length) throws IndexOutOfBoundsException;
+
+ /**
+ * Sets the buffer allocator for use in netty.
+ *
+ * @param allocator netty buffer allocator
+ */
+ void setAllocator(ByteBufAllocator allocator);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
index 4486caa..28c5699 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -137,7 +137,7 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
}
@Override
- public void recycle() {
+ public void recycleBuffer() {
release();
}
@@ -147,7 +147,7 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
}
@Override
- public NetworkBuffer retain() {
+ public NetworkBuffer retainBuffer() {
return (NetworkBuffer) super.retain();
}
@@ -414,11 +414,7 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
return checkNotNull(allocator);
}
- /**
- * Sets the buffer allocator for use in netty.
- *
- * @param allocator netty buffer allocator
- */
+ @Override
public void setAllocator(ByteBufAllocator allocator) {
this.allocator = allocator;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
index 5160853..f05febc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyMessage.java
@@ -305,7 +305,7 @@ public abstract class NettyMessage {
throw new IOException(t);
}
finally {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 8e4d8cd..ba7bf22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -430,7 +430,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
finally {
if (!success) {
if (buffer != null) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
}
@@ -485,7 +485,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
finally {
if (!success) {
if (buffer != null) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index 41f87ae..58df4b6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -213,7 +213,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
}
} catch (Throwable t) {
if (next != null) {
- next.buffer().recycle();
+ next.buffer().recycleBuffer();
}
throw new IOException(t.getMessage(), t);
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 07a53fb..051f871 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -74,7 +74,7 @@ class PipelinedSubpartition extends ResultSubpartition {
synchronized (buffers) {
if (isFinished || isReleased) {
- buffer.recycle();
+ buffer.recycleBuffer();
return false;
}
@@ -133,7 +133,7 @@ class PipelinedSubpartition extends ResultSubpartition {
// Release all available buffers
Buffer buffer;
while ((buffer = buffers.poll()) != null) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
view = readView;
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 01c8bfc..aac8fb9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -248,13 +248,13 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
final ResultSubpartition subpartition = subpartitions[subpartitionIndex];
// retain for buffer use after add() but also to have a simple path for recycle()
- buffer.retain();
+ buffer.retainBuffer();
success = subpartition.add(buffer);
} finally {
if (success) {
notifyPipelinedConsumers();
}
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
index fb31592..71753d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java
@@ -37,7 +37,7 @@ public interface ResultSubpartitionView {
* than the consumer or a spilled queue needs to read in more data.
*
* <p><strong>Important</strong>: The consumer has to make sure that each
- * buffer instance will eventually be recycled with {@link Buffer#recycle()}
+ * buffer instance will eventually be recycled with {@link Buffer#recycleBuffer()}
* after it has been consumed.
*/
@Nullable
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 827a577..e57e30a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -104,7 +104,7 @@ class SpillableSubpartition extends ResultSubpartition {
synchronized (buffers) {
if (isFinished || isReleased) {
- buffer.recycle();
+ buffer.recycleBuffer();
return false;
}
@@ -123,14 +123,14 @@ class SpillableSubpartition extends ResultSubpartition {
// Didn't return early => go to disk
try {
// retain buffer for updateStatistics() below
- spillWriter.writeBlock(buffer.retain());
+ spillWriter.writeBlock(buffer.retainBuffer());
synchronized (buffers) {
// See the note above, but only do this if the buffer was correctly added!
updateStatistics(buffer);
increaseBuffersInBacklog(buffer);
}
} finally {
- buffer.recycle();
+ buffer.recycleBuffer();
}
return true;
@@ -162,7 +162,7 @@ class SpillableSubpartition extends ResultSubpartition {
// Release all available buffers
for (Buffer buffer : buffers) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
buffers.clear();
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index a16e59a..279a023 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -172,7 +172,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
// we are never giving this buffer out in getNextBuffer(), so we need to clean it up
synchronized (buffers) {
if (nextBuffer != null) {
- nextBuffer.recycle();
+ nextBuffer.recycleBuffer();
nextBuffer = null;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 9b0226a..0d633d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -246,7 +246,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
if (buffer.getRecycler() == this) {
exclusiveRecyclingSegments.add(buffer.getMemorySegment());
} else {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
}
@@ -350,7 +350,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
// Check the isReleased state outside synchronized block first to avoid
// deadlock with releaseAllResources running in parallel.
if (isReleased.get()) {
- buffer.recycle();
+ buffer.recycleBuffer();
return false;
}
@@ -361,7 +361,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
// Important: double check the isReleased state inside synchronized block, so there is no
// race condition when notifyBufferAvailable and releaseAllResources running in parallel.
if (isReleased.get() || bufferQueue.getAvailableBufferSize() >= numRequiredBuffers) {
- buffer.recycle();
+ buffer.recycleBuffer();
return false;
}
@@ -517,7 +517,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
}
} finally {
if (!success) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
}
@@ -599,7 +599,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
exclusiveBuffers.add(buffer);
if (getAvailableBufferSize() > numRequiredBuffers) {
Buffer floatingBuffer = floatingBuffers.poll();
- floatingBuffer.recycle();
+ floatingBuffer.recycleBuffer();
return 0;
} else {
return 1;
@@ -635,7 +635,7 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
void releaseAll(List<MemorySegment> exclusiveSegments) {
Buffer buffer;
while ((buffer = floatingBuffers.poll()) != null) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
while ((buffer = exclusiveBuffers.poll()) != null) {
exclusiveSegments.add(buffer.getMemorySegment());
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
index 1f78cd9..bc4c42a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/disk/iomanager/AsynchronousBufferFileWriterTest.java
@@ -90,7 +90,7 @@ public class AsynchronousBufferFileWriterTest {
writer.writeBlock(buffer);
} finally {
if (!buffer.isRecycled()) {
- buffer.recycle();
+ buffer.recycleBuffer();
Assert.fail("buffer not recycled");
}
assertEquals("Shouln't increment number of outstanding requests.", 0, writer.getNumberOfOutstandingRequests());
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
index f79ab0f..aa14860 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java
@@ -104,7 +104,7 @@ public class EventSerializerTest {
.fromBuffer(serializedEvent, cl);
assertEquals(EndOfPartitionEvent.INSTANCE, event);
} finally {
- serializedEvent.recycle();
+ serializedEvent.recycleBuffer();
}
}
@@ -156,7 +156,7 @@ public class EventSerializerTest {
final ClassLoader cl = getClass().getClassLoader();
return EventSerializer.isEvent(serializedEvent, eventClass, cl);
} finally {
- serializedEvent.recycle();
+ serializedEvent.recycleBuffer();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index ded1817..9e281d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -193,7 +193,7 @@ public class RecordWriterTest {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
Buffer buffer = (Buffer) invocation.getArguments()[0];
- buffer.recycle();
+ buffer.recycleBuffer();
throw new ExpectedTestException();
}
@@ -478,7 +478,7 @@ public class RecordWriterTest {
} else {
// is event:
AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
- buffer.recycle(); // the buffer is not needed anymore
+ buffer.recycleBuffer(); // the buffer is not needed anymore
Integer targetChannel = (Integer) invocationOnMock.getArguments()[1];
queues[targetChannel].add(new BufferOrEvent(event, targetChannel));
}
@@ -530,7 +530,7 @@ public class RecordWriterTest {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
- ((Buffer) invocation.getArguments()[0]).recycle();
+ ((Buffer) invocation.getArguments()[0]).recycleBuffer();
return null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
index d15aba6..14eb6cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferPoolFactoryTest.java
@@ -138,7 +138,7 @@ public class BufferPoolFactoryTest {
assertNull(bufferPool2.requestBuffer());
// as soon as one excess buffer of bufferPool1 is recycled, it should be available for bufferPool2
- buffers.remove(0).recycle();
+ buffers.remove(0).recycleBuffer();
// recycle returns the excess buffer to the network buffer pool
assertEquals(1, networkBufferPool.getNumberOfAvailableMemorySegments());
// verify the number of buffers taken from the pools
@@ -158,7 +158,7 @@ public class BufferPoolFactoryTest {
bufferPool2.bestEffortGetNumOfUsedBuffers() + bufferPool2.getNumberOfAvailableMemorySegments());
} finally {
for (Buffer buffer : buffers) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
if (bufferPool1 != null) {
bufferPool1.lazyDestroy();
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
index 4eb568a..b04286e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
@@ -108,7 +108,7 @@ public class LocalBufferPoolTest {
}
for (Buffer buffer : requests) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
@@ -142,7 +142,7 @@ public class LocalBufferPoolTest {
// Recycle should return buffers to memory segment pool
for (Buffer buffer : requests) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
@@ -166,12 +166,12 @@ public class LocalBufferPoolTest {
assertEquals(numBuffers, getNumRequestedFromMemorySegmentPool());
for (int i = 1; i < numBuffers / 2; i++) {
- requests.remove(0).recycle();
+ requests.remove(0).recycleBuffer();
assertEquals(numBuffers - i, getNumRequestedFromMemorySegmentPool());
}
for (Buffer buffer : requests) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
@@ -188,7 +188,7 @@ public class LocalBufferPoolTest {
// Recycle all
for (Buffer buffer : requests) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
assertEquals(numBuffers, localBufferPool.getNumberOfAvailableMemorySegments());
@@ -227,13 +227,13 @@ public class LocalBufferPoolTest {
// Recycle the first buffer to notify both of the above listeners once
// and the twoTimesListener will be added into the registeredListeners
// queue of buffer pool again
- available1.recycle();
+ available1.recycleBuffer();
verify(oneTimeListener, times(1)).notifyBufferAvailable(any(Buffer.class));
verify(twoTimesListener, times(1)).notifyBufferAvailable(any(Buffer.class));
// Recycle the second buffer to only notify the twoTimesListener
- available2.recycle();
+ available2.recycleBuffer();
verify(oneTimeListener, times(1)).notifyBufferAvailable(any(Buffer.class));
verify(twoTimesListener, times(2)).notifyBufferAvailable(any(Buffer.class));
@@ -255,7 +255,7 @@ public class LocalBufferPoolTest {
localBufferPool.lazyDestroy();
- available.recycle();
+ available.recycleBuffer();
verify(listener, times(1)).notifyBufferDestroyed();
}
@@ -333,7 +333,7 @@ public class LocalBufferPoolTest {
List<Buffer> requestedBuffers = f.get(60, TimeUnit.SECONDS);
for (Buffer buffer : requestedBuffers) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
@@ -354,7 +354,7 @@ public class LocalBufferPoolTest {
assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
assertNull(localBufferPool.requestBuffer());
assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
- buffer1.recycle();
+ buffer1.recycleBuffer();
assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
// check max number of buffers:
@@ -366,9 +366,9 @@ public class LocalBufferPoolTest {
assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
assertNull(localBufferPool.requestBuffer());
assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
- buffer1.recycle();
+ buffer1.recycleBuffer();
assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
- buffer2.recycle();
+ buffer2.recycleBuffer();
assertEquals(2, localBufferPool.getNumberOfAvailableMemorySegments());
// try to set too large buffer size:
@@ -380,9 +380,9 @@ public class LocalBufferPoolTest {
assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
assertNull(localBufferPool.requestBuffer());
assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
- buffer1.recycle();
+ buffer1.recycleBuffer();
assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
- buffer2.recycle();
+ buffer2.recycleBuffer();
assertEquals(2, localBufferPool.getNumberOfAvailableMemorySegments());
// decrease size again
@@ -391,7 +391,7 @@ public class LocalBufferPoolTest {
assertNotNull(buffer1 = localBufferPool.requestBuffer());
assertEquals(0, localBufferPool.getNumberOfAvailableMemorySegments());
assertNull(localBufferPool.requestBuffer());
- buffer1.recycle();
+ buffer1.recycleBuffer();
assertEquals(1, localBufferPool.getNumberOfAvailableMemorySegments());
}
@@ -410,7 +410,7 @@ public class LocalBufferPoolTest {
@Override
public boolean notifyBufferAvailable(Buffer buffer) {
times++;
- buffer.recycle();
+ buffer.recycleBuffer();
return times < notificationTimes;
}
@@ -436,7 +436,7 @@ public class LocalBufferPoolTest {
try {
for (int i = 0; i < numBuffersToRequest; i++) {
Buffer buffer = bufferProvider.requestBufferBlocking();
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
index 4d7648a..739a4d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPoolTest.java
@@ -146,7 +146,7 @@ public class NetworkBufferPoolTest {
// the recycled buffers should go to the global pool
for (Buffer b : buffers) {
- b.recycle();
+ b.recycleBuffer();
}
assertEquals(globalPool.getTotalNumberOfMemorySegments(), globalPool.getNumberOfAvailableMemorySegments());
@@ -284,7 +284,7 @@ public class NetworkBufferPoolTest {
}
for (Buffer buffer : buffers) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
});
bufferRecycler.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index ed5fff8..53ecda3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -67,7 +67,7 @@ public class NettyMessageSerializationTest {
NettyMessage.BufferResponse actual = encodeAndDecode(expected);
// Verify recycle has been called on buffer instance
- verify(buffer, times(1)).recycle();
+ verify(buffer, times(1)).recycleBuffer();
final ByteBuf retainedSlice = actual.getNettyBuffer();
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 9481d57..68deec7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -142,7 +142,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
InputGate gate = getEnvironment().getInputGate(0);
Buffer buffer = gate.getNextBufferOrEvent().getBuffer();
if (buffer != null) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 56ed622..40be8df 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -237,7 +237,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
numberOfBuffers++;
- buffer.recycle();
+ buffer.recycleBuffer();
}
@Override
@@ -311,11 +311,11 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
} finally {
buffer1Recycled = buffer1.isRecycled();
if (!buffer1Recycled) {
- buffer1.recycle();
+ buffer1.recycleBuffer();
}
buffer2Recycled = buffer2.isRecycled();
if (!buffer2Recycled) {
- buffer2.recycle();
+ buffer2.recycleBuffer();
}
}
if (!buffer1Recycled) {
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 5f5f459..9c02b65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -128,7 +128,7 @@ public class ResultPartitionTest {
} finally {
if (!buffer.isRecycled()) {
Assert.fail("buffer not recycled");
- buffer.recycle();
+ buffer.recycleBuffer();
}
// should not have notified either
verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
@@ -162,7 +162,7 @@ public class ResultPartitionTest {
} finally {
if (!buffer.isRecycled()) {
Assert.fail("buffer not recycled");
- buffer.recycle();
+ buffer.recycleBuffer();
}
// should not have notified either
verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
@@ -224,7 +224,7 @@ public class ResultPartitionTest {
assertFalse("buffer should not be recycled (still in the queue)", buffer.isRecycled());
} finally {
if (!buffer.isRecycled()) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
// should have been notified for pipelined partitions
if (pipelined.isPipelined()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 9664945..4a03321 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -194,8 +194,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
SpillableSubpartition partition = createSubpartition();
Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
- buffer.retain();
- buffer.retain();
+ buffer.retainBuffer();
+ buffer.retainBuffer();
partition.add(buffer);
partition.add(buffer);
@@ -243,7 +243,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
assertNotSame(buffer, read);
assertFalse(read.buffer().isRecycled());
- read.buffer().recycle();
+ read.buffer().recycleBuffer();
assertTrue(read.buffer().isRecycled());
read = reader.getNextBuffer();
@@ -252,7 +252,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
assertNotSame(buffer, read);
assertFalse(read.buffer().isRecycled());
- read.buffer().recycle();
+ read.buffer().recycleBuffer();
assertTrue(read.buffer().isRecycled());
read = reader.getNextBuffer();
@@ -261,7 +261,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
assertNotSame(buffer, read);
assertFalse(read.buffer().isRecycled());
- read.buffer().recycle();
+ read.buffer().recycleBuffer();
assertTrue(read.buffer().isRecycled());
// End of partition
@@ -272,7 +272,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertEquals(EndOfPartitionEvent.class,
EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass());
assertFalse(read.buffer().isRecycled());
- read.buffer().recycle();
+ read.buffer().recycleBuffer();
assertTrue(read.buffer().isRecycled());
// finally check that the buffer has been freed after a successful (or failed) write
@@ -292,8 +292,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
SpillableSubpartition partition = createSubpartition();
Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
- buffer.retain();
- buffer.retain();
+ buffer.retainBuffer();
+ buffer.retainBuffer();
partition.add(buffer);
partition.add(buffer);
@@ -318,7 +318,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertSame(buffer, read.buffer());
assertEquals(2, partition.getBuffersInBacklog());
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
- read.buffer().recycle();
+ read.buffer().recycleBuffer();
assertEquals(2, listener.getNumNotifiedBuffers());
assertFalse(buffer.isRecycled());
@@ -338,7 +338,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertEquals(1, partition.getBuffersInBacklog());
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
assertSame(buffer, read.buffer());
- read.buffer().recycle();
+ read.buffer().recycleBuffer();
// now the buffer may be freed, depending on the timing of the write operation
// -> let's do this check at the end of the test (to save some time)
@@ -348,7 +348,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
assertNotSame(buffer, read.buffer());
assertFalse(read.buffer().isRecycled());
- read.buffer().recycle();
+ read.buffer().recycleBuffer();
assertTrue(read.buffer().isRecycled());
// End of partition
@@ -359,7 +359,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertEquals(EndOfPartitionEvent.class,
EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass());
assertFalse(read.buffer().isRecycled());
- read.buffer().recycle();
+ read.buffer().recycleBuffer();
assertTrue(read.buffer().isRecycled());
// finally check that the buffer has been freed after a successful (or failed) write
@@ -408,7 +408,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
partition.add(buffer);
} finally {
if (!buffer.isRecycled()) {
- buffer.recycle();
+ buffer.recycleBuffer();
Assert.fail("buffer not recycled");
}
}
@@ -454,7 +454,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
} finally {
bufferRecycled = buffer.isRecycled();
if (!bufferRecycled) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
if (!bufferRecycled) {
@@ -483,7 +483,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
ioManager.shutdown();
bufferRecycled = buffer.isRecycled();
if (!bufferRecycled) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
if (bufferRecycled) {
@@ -546,10 +546,10 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
} finally {
ioManager.shutdown();
if (!buffer1.isRecycled()) {
- buffer1.recycle();
+ buffer1.recycleBuffer();
}
if (!buffer2.isRecycled()) {
- buffer2.recycle();
+ buffer2.recycleBuffer();
}
}
// note: a view requires a finished partition which has an additional EndOfPartitionEvent
@@ -577,7 +577,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
ioManager.shutdown();
bufferRecycled = buffer.isRecycled();
if (!bufferRecycled) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
}
if (!bufferRecycled) {
@@ -666,11 +666,11 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
} finally {
buffer1Recycled = buffer1.isRecycled();
if (!buffer1Recycled) {
- buffer1.recycle();
+ buffer1.recycleBuffer();
}
buffer2Recycled = buffer2.isRecycled();
if (!buffer2Recycled) {
- buffer2.recycle();
+ buffer2.recycleBuffer();
}
}
if (!buffer1Recycled) {
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 16cd90d..2f68418 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -518,7 +518,7 @@ public class LocalInputChannelTest {
BufferOrEvent boe;
while ((boe = inputGate.getNextBufferOrEvent()) != null) {
if (boe.isBuffer()) {
- boe.getBuffer().recycle();
+ boe.getBuffer().recycleBuffer();
// Check that we don't receive too many buffers
if (++numberOfBuffersPerChannel[boe.getChannelIndex()]
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 469aa97..8c8b96a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -72,7 +72,7 @@ public class RemoteInputChannelTest {
final Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
// The test
- inputChannel.onBuffer(buffer.retain(), 0, -1);
+ inputChannel.onBuffer(buffer.retainBuffer(), 0, -1);
// This does not yet throw the exception, but sets the error at the channel.
inputChannel.onBuffer(buffer, 29, -1);
@@ -119,7 +119,7 @@ public class RemoteInputChannelTest {
for (int j = 0; j < 128; j++) {
// this is the same buffer over and over again which will be
// recycled by the RemoteInputChannel
- inputChannel.onBuffer(buffer.retain(), j, -1);
+ inputChannel.onBuffer(buffer.retainBuffer(), j, -1);
}
if (inputChannel.isReleased()) {
@@ -155,7 +155,7 @@ public class RemoteInputChannelTest {
finally {
executor.shutdown();
assertFalse(buffer.isRecycled());
- buffer.recycle();
+ buffer.recycleBuffer();
assertTrue(buffer.isRecycled());
}
}
@@ -372,7 +372,7 @@ public class RemoteInputChannelTest {
0, bufferPool.getNumberOfAvailableMemorySegments());
// Recycle one floating buffer
- floatingBufferQueue.poll().recycle();
+ floatingBufferQueue.poll().recycleBuffer();
// Assign the floating buffer to the listener and the channel is still waiting for more floating buffers
verify(bufferPool, times(15)).requestBuffer();
@@ -385,7 +385,7 @@ public class RemoteInputChannelTest {
0, bufferPool.getNumberOfAvailableMemorySegments());
// Recycle one more floating buffer
- floatingBufferQueue.poll().recycle();
+ floatingBufferQueue.poll().recycleBuffer();
// Assign the floating buffer to the listener and the channel is still waiting for more floating buffers
verify(bufferPool, times(15)).requestBuffer();
@@ -411,7 +411,7 @@ public class RemoteInputChannelTest {
0, bufferPool.getNumberOfAvailableMemorySegments());
// Recycle one exclusive buffer
- exclusiveBuffer.recycle();
+ exclusiveBuffer.recycleBuffer();
// The exclusive buffer is returned to the channel directly
verify(bufferPool, times(15)).requestBuffer();
@@ -474,7 +474,7 @@ public class RemoteInputChannelTest {
0, bufferPool.getNumberOfAvailableMemorySegments());
// Recycle one floating buffer
- floatingBuffer.recycle();
+ floatingBuffer.recycleBuffer();
// The floating buffer is returned to local buffer directly because the channel is not waiting
// for floating buffers
@@ -488,7 +488,7 @@ public class RemoteInputChannelTest {
1, bufferPool.getNumberOfAvailableMemorySegments());
// Recycle one exclusive buffer
- exclusiveBuffer.recycle();
+ exclusiveBuffer.recycleBuffer();
// Return one extra floating buffer to the local pool because the number of available buffers
// already equals to required buffers
@@ -566,7 +566,7 @@ public class RemoteInputChannelTest {
0, bufferPool.getNumberOfAvailableMemorySegments());
// Recycle one exclusive buffer
- exclusiveBuffer.recycle();
+ exclusiveBuffer.recycleBuffer();
// Return one extra floating buffer to the local pool because the number of available buffers
// is more than required buffers
@@ -580,7 +580,7 @@ public class RemoteInputChannelTest {
1, bufferPool.getNumberOfAvailableMemorySegments());
// Recycle one floating buffer
- floatingBuffer.recycle();
+ floatingBuffer.recycleBuffer();
// The floating buffer is returned to local pool directly because the channel is not waiting for
// floating buffers
@@ -654,7 +654,7 @@ public class RemoteInputChannelTest {
// Recycle three floating buffers to trigger notify buffer available
for (Buffer buffer : floatingBuffers) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
verify(channel1, times(1)).notifyBufferAvailable(any(Buffer.class));
@@ -920,7 +920,7 @@ public class RemoteInputChannelTest {
@Override
public Void call() throws Exception {
for (Buffer buffer : exclusiveBuffers) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
return null;
@@ -948,7 +948,7 @@ public class RemoteInputChannelTest {
@Override
public Void call() throws Exception {
for (Buffer buffer : floatingBuffers) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
return null;
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
index 0d66f13..f07c5d7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestConsumerCallback.java
@@ -69,7 +69,7 @@ public interface TestConsumerCallback {
public void onBuffer(Buffer buffer) {
super.onBuffer(buffer);
- buffer.recycle();
+ buffer.recycleBuffer();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
index 37137ff..b4bdd3e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestSubpartitionConsumer.java
@@ -109,7 +109,7 @@ public class TestSubpartitionConsumer implements Callable<Boolean>, BufferAvaila
callback.onEvent(event);
- bufferAndBacklog.buffer().recycle();
+ bufferAndBacklog.buffer().recycleBuffer();
if (event.getClass() == EndOfPartitionEvent.class) {
subpartitionView.notifySubpartitionConsumed();
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
index 7c9e80b..b1a4cd4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
@@ -219,7 +219,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
// 2) Release all buffers and let the tasks grab one
//
for (Buffer buf : buffers) {
- buf.recycle();
+ buf.recycleBuffer();
Assert.assertTrue(buf.isRecycled());
}
@@ -322,7 +322,7 @@ public class BackPressureStatsTrackerITCase extends TestLogger {
while (true) {
Buffer buffer = testBufferPool.requestBufferBlocking();
// Got a buffer, yay!
- buffer.recycle();
+ buffer.recycleBuffer();
new CountDownLatch(1).await();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 7f6e875..33aac7e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -151,7 +151,7 @@ public class BufferSpiller {
}
finally {
if (boe.isBuffer()) {
- boe.getBuffer().recycle();
+ boe.getBuffer().recycleBuffer();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index cf8f452..9f526a5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -188,7 +188,7 @@ public class StreamInputProcessor<IN> {
DeserializationResult result = currentRecordDeserializer.getNextRecord(deserializationDelegate);
if (result.isBufferConsumed()) {
- currentRecordDeserializer.getCurrentBuffer().recycle();
+ currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
@@ -273,7 +273,7 @@ public class StreamInputProcessor<IN> {
for (RecordDeserializer<?> deserializer : recordDeserializers) {
Buffer buffer = deserializer.getCurrentBuffer();
if (buffer != null && !buffer.isRecycled()) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
deserializer.clear();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index b99ad53..a3d9236 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -225,7 +225,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
}
if (result.isBufferConsumed()) {
- currentRecordDeserializer.getCurrentBuffer().recycle();
+ currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
currentRecordDeserializer = null;
}
@@ -338,7 +338,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
for (RecordDeserializer<?> deserializer : recordDeserializers) {
Buffer buffer = deserializer.getCurrentBuffer();
if (buffer != null && !buffer.isRecycled()) {
- buffer.recycle();
+ buffer.recycleBuffer();
}
deserializer.clear();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index cca6fb3..9a59737 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -282,7 +282,7 @@ public class BarrierBufferAlignmentLimitTest {
buf.setSize(size);
// retain an additional time so it does not get disposed after being read by the input gate
- buf.retain();
+ buf.retainBuffer();
return new BufferOrEvent(buf, channel);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
index 090e44a..146be75 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferMassiveRandomTest.java
@@ -67,7 +67,7 @@ public class BarrierBufferMassiveRandomTest {
for (int i = 0; i < 2000000; i++) {
BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
if (boe.isBuffer()) {
- boe.getBuffer().recycle();
+ boe.getBuffer().recycleBuffer();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/db440f24/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index c7c9df2..4a03445 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -1411,7 +1411,7 @@ public class BarrierBufferTest {
buf.setSize(size);
// retain an additional time so it does not get disposed after being read by the input gate
- buf.retain();
+ buf.retainBuffer();
return new BufferOrEvent(buf, channel);
}
[2/2] flink git commit: [FLINK-8395][network] add a read-only sliced
ByteBuf implementation based on NetworkBuffer
Posted by sr...@apache.org.
[FLINK-8395][network] add a read-only sliced ByteBuf implementation based on NetworkBuffer
This closes #5288.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/665347cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/665347cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/665347cf
Branch: refs/heads/master
Commit: 665347cf867f0b807fe9d4787b5ef42b0581b510
Parents: db440f2
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jan 9 22:57:18 2018 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Jan 18 15:35:25 2018 +0100
----------------------------------------------------------------------
.../flink/runtime/io/network/buffer/Buffer.java | 24 ++
.../io/network/buffer/NetworkBuffer.java | 10 +
.../buffer/ReadOnlySlicedNetworkBuffer.java | 199 +++++++++++
.../runtime/io/network/buffer/BufferTest.java | 78 ++++-
.../buffer/ReadOnlySlicedBufferTest.java | 326 +++++++++++++++++++
5 files changed, 629 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/665347cf/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index f1eeb69..3c9933e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -97,6 +97,30 @@ public interface Buffer {
Buffer retainBuffer();
/**
+ * Returns a read-only slice of this buffer's readable bytes, i.e. between
+ * {@link #getReaderIndex()} and {@link #getSize()}.
+ *
+ * <p>Reader and writer indices as well as markers are not shared. Reference counters are
+ * shared but the slice is not {@link #retainBuffer() retained} automatically.
+ *
+ * @return a read-only sliced buffer
+ */
+ Buffer readOnlySlice();
+
+ /**
+ * Returns a read-only slice of this buffer.
+ *
+ * <p>Reader and writer indices as well as markers are not shared. Reference counters are
+ * shared but the slice is not {@link #retainBuffer() retained} automatically.
+ *
+ * @param index the index to start from
+ * @param length the length of the slice
+ *
+ * @return a read-only sliced buffer
+ */
+ Buffer readOnlySlice(int index, int length);
+
+ /**
* Returns the maximum size of the buffer, i.e. the capacity of the underlying {@link MemorySegment}.
*
* @return size of the buffer
http://git-wip-us.apache.org/repos/asf/flink/blob/665347cf/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
index 28c5699..4ca50db 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -152,6 +152,16 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu
}
@Override
+ public ReadOnlySlicedNetworkBuffer readOnlySlice() {
+ return readOnlySlice(readerIndex(), readableBytes());
+ }
+
+ @Override
+ public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length) {
+ return new ReadOnlySlicedNetworkBuffer(this, index, length);
+ }
+
+ @Override
protected void deallocate() {
recycler.recycle(memorySegment);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/665347cf/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
new file mode 100644
index 0000000..f268314
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ReadOnlyByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.SlicedByteBuf;
+
+import java.nio.ByteBuffer;
+import java.nio.ReadOnlyBufferException;
+
+/**
+ * Minimal best-effort read-only sliced {@link Buffer} implementation wrapping a
+ * {@link NetworkBuffer}'s sub-region based on <tt>io.netty.buffer.SlicedByteBuf</tt> and
+ * <tt>io.netty.buffer.ReadOnlyByteBuf</tt>.
+ *
+ * <p><strong>BEWARE:</strong> We do not guarantee to block every operation that is able to write
+ * data but all returned data structures should be handled as if it was!.
+ */
+public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implements Buffer {
+
+ /**
+ * Creates a buffer which shares the memory segment of the given buffer and exposed the given
+ * sub-region only.
+ *
+ * <p>Reader and writer indices as well as markers are not shared. Reference counters are
+ * shared but the slice is not {@link #retainBuffer() retained} automatically.
+ *
+ * @param buffer the buffer to derive from
+ * @param index the index to start from
+ * @param length the length of the slice
+ */
+ ReadOnlySlicedNetworkBuffer(NetworkBuffer buffer, int index, int length) {
+ super(new SlicedByteBuf(buffer, index, length));
+ }
+
+ /**
+ * Creates a buffer which shares the memory segment of the given buffer and exposed the given
+ * sub-region only.
+ *
+ * <p>Reader and writer indices as well as markers are not shared. Reference counters are
+ * shared but the slice is not {@link #retainBuffer() retained} automatically.
+ *
+ * @param buffer the buffer to derive from
+ * @param index the index to start from
+ * @param length the length of the slice
+ */
+ private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int index, int length) {
+ super(new SlicedByteBuf(buffer, index, length));
+ }
+
+ @Override
+ public ByteBuf unwrap() {
+ return super.unwrap().unwrap();
+ }
+
+ @Override
+ public boolean isBuffer() {
+ return ((Buffer) unwrap()).isBuffer();
+ }
+
+ @Override
+ public void tagAsEvent() {
+ throw new ReadOnlyBufferException();
+ }
+
+ /**
+ * Returns the underlying memory segment.
+ *
+ * <p><strong>BEWARE:</strong> Although we cannot set the memory segment read-only it should be
+ * handled as if it was!.
+ *
+ * @return the memory segment backing this buffer
+ */
+ @Override
+ public MemorySegment getMemorySegment() {
+ return ((Buffer) unwrap()).getMemorySegment();
+ }
+
+ @Override
+ public BufferRecycler getRecycler() {
+ return ((Buffer) unwrap()).getRecycler();
+ }
+
+ @Override
+ public void recycleBuffer() {
+ ((Buffer) unwrap()).recycleBuffer();
+ }
+
+ @Override
+ public boolean isRecycled() {
+ return ((Buffer) unwrap()).isRecycled();
+ }
+
+ @Override
+ public ReadOnlySlicedNetworkBuffer retainBuffer() {
+ ((Buffer) unwrap()).retainBuffer();
+ return this;
+ }
+
+ @Override
+ public ReadOnlySlicedNetworkBuffer readOnlySlice() {
+ return readOnlySlice(readerIndex(), readableBytes());
+ }
+
+ @Override
+ public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length) {
+ return new ReadOnlySlicedNetworkBuffer(super.unwrap(), index, length);
+ }
+
+ @Override
+ public int getMaxCapacity() {
+ return maxCapacity();
+ }
+
+ @Override
+ public int getReaderIndex() {
+ return readerIndex();
+ }
+
+ @Override
+ public void setReaderIndex(int readerIndex) throws IndexOutOfBoundsException {
+ readerIndex(readerIndex);
+ }
+
+ @Override
+ public int getSizeUnsafe() {
+ return writerIndex();
+ }
+
+ @Override
+ public int getSize() {
+ return writerIndex();
+ }
+
+ @Override
+ public void setSize(int writerIndex) {
+ writerIndex(writerIndex);
+ }
+
+ @Override
+ public ByteBuffer getNioBufferReadable() {
+ return nioBuffer();
+ }
+
+ @Override
+ public ByteBuffer getNioBuffer(int index, int length) throws IndexOutOfBoundsException {
+ return nioBuffer(index, length);
+ }
+
+ @Override
+ public ByteBuffer nioBuffer(int index, int length) {
+ return super.nioBuffer(index, length).asReadOnlyBuffer();
+ }
+
+ @Override
+ public boolean isWritable() {
+ return false;
+ }
+
+ @Override
+ public boolean isWritable(int numBytes) {
+ return false;
+ }
+
+ @Override
+ public ByteBuf ensureWritable(int minWritableBytes) {
+ // note: ReadOnlyByteBuf allows this but in most cases this does not make sense
+ if (minWritableBytes != 0) {
+ throw new ReadOnlyBufferException();
+ }
+ return this;
+ }
+
+ @Override
+ public void setAllocator(ByteBufAllocator allocator) {
+ ((Buffer) unwrap()).setAllocator(allocator);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/665347cf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
index f59fa9f..90516e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
@@ -166,13 +166,13 @@ public class BufferTest extends AbstractByteBufTest {
}
/**
- * Tests that {@link NetworkBuffer#recycle()} and {@link NetworkBuffer#isRecycled()} are coupled
- * and are also consistent with {@link NetworkBuffer#refCnt()}.
+ * Tests that {@link NetworkBuffer#recycleBuffer()} and {@link NetworkBuffer#isRecycled()} are
+ * coupled and are also consistent with {@link NetworkBuffer#refCnt()}.
*/
private static void testRecycleBuffer(boolean isBuffer) {
NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
assertFalse(buffer.isRecycled());
- buffer.recycle();
+ buffer.recycleBuffer();
assertTrue(buffer.isRecycled());
assertEquals(0, buffer.refCnt());
}
@@ -188,18 +188,74 @@ public class BufferTest extends AbstractByteBufTest {
}
/**
- * Tests that {@link NetworkBuffer#retain()} and {@link NetworkBuffer#isRecycled()} are coupled
- * and are also consistent with {@link NetworkBuffer#refCnt()}.
+ * Tests that {@link NetworkBuffer#retainBuffer()} and {@link NetworkBuffer#isRecycled()} are
+ * coupled and are also consistent with {@link NetworkBuffer#refCnt()}.
*/
private static void testRetainBuffer(boolean isBuffer) {
NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
assertFalse(buffer.isRecycled());
- buffer.retain();
+ buffer.retainBuffer();
assertFalse(buffer.isRecycled());
assertEquals(2, buffer.refCnt());
}
@Test
+ public void testDataBufferCreateSlice1() {
+ testCreateSlice1(true);
+ }
+
+ @Test
+ public void testEventBufferCreateSlice1() {
+ testCreateSlice1(false);
+ }
+
+ private static void testCreateSlice1(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+ buffer.setSize(10); // fake some data
+ ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice();
+
+ assertEquals(0, slice.getReaderIndex());
+ assertEquals(10, slice.getSize());
+ assertEquals(10, slice.getSizeUnsafe());
+ assertSame(buffer, slice.unwrap());
+
+ // slice indices should be independent:
+ buffer.setSize(8);
+ buffer.setReaderIndex(2);
+ assertEquals(0, slice.getReaderIndex());
+ assertEquals(10, slice.getSize());
+ assertEquals(10, slice.getSizeUnsafe());
+ }
+
+ @Test
+ public void testDataBufferCreateSlice2() {
+ testCreateSlice2(true);
+ }
+
+ @Test
+ public void testEventBufferCreateSlice2() {
+ testCreateSlice2(false);
+ }
+
+ private static void testCreateSlice2(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+ buffer.setSize(2); // fake some data
+ ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice(1, 10);
+
+ assertEquals(0, slice.getReaderIndex());
+ assertEquals(10, slice.getSize());
+ assertEquals(10, slice.getSizeUnsafe());
+ assertSame(buffer, slice.unwrap());
+
+ // slice indices should be independent:
+ buffer.setSize(8);
+ buffer.setReaderIndex(2);
+ assertEquals(0, slice.getReaderIndex());
+ assertEquals(10, slice.getSize());
+ assertEquals(10, slice.getSizeUnsafe());
+ }
+
+ @Test
public void testDataBufferGetMaxCapacity() {
testGetMaxCapacity(true);
}
@@ -330,7 +386,10 @@ public class BufferTest extends AbstractByteBufTest {
@Test
public void testGetNioBufferReadableThreadSafe() {
NetworkBuffer buffer = newBuffer(1024, 1024);
+ testGetNioBufferReadableThreadSafe(buffer);
+ }
+ static void testGetNioBufferReadableThreadSafe(Buffer buffer) {
ByteBuffer buf1 = buffer.getNioBufferReadable();
ByteBuffer buf2 = buffer.getNioBufferReadable();
@@ -381,9 +440,12 @@ public class BufferTest extends AbstractByteBufTest {
@Test
public void testGetNioBufferThreadSafe() {
NetworkBuffer buffer = newBuffer(1024, 1024);
+ testGetNioBufferThreadSafe(buffer, 10);
+ }
- ByteBuffer buf1 = buffer.getNioBuffer(0, 10);
- ByteBuffer buf2 = buffer.getNioBuffer(0, 10);
+ static void testGetNioBufferThreadSafe(Buffer buffer, int length) {
+ ByteBuffer buf1 = buffer.getNioBuffer(0, length);
+ ByteBuffer buf2 = buffer.getNioBuffer(0, length);
assertNotNull(buf1);
assertNotNull(buf2);
http://git-wip-us.apache.org/repos/asf/flink/blob/665347cf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
new file mode 100644
index 0000000..c190bcf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ReadOnlyBufferException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link ReadOnlySlicedNetworkBuffer}.
+ */
+public class ReadOnlySlicedBufferTest {
+ private static final int BUFFER_SIZE = 1024;
+ private static final int DATA_SIZE = 10;
+
+ private NetworkBuffer buffer;
+
+ @Before
+ public void setUp() throws Exception {
+ final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE);
+ buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, true, DATA_SIZE);
+ buffer.setSize(DATA_SIZE);
+ }
+
+ @Test
+ public void testForwardsIsBuffer() throws IOException {
+ assertEquals(buffer.isBuffer(), buffer.readOnlySlice().isBuffer());
+ assertEquals(buffer.isBuffer(), buffer.readOnlySlice(1, 2).isBuffer());
+ NetworkBuffer eventBuffer = (NetworkBuffer) EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+ assertEquals(eventBuffer.isBuffer(), eventBuffer.readOnlySlice().isBuffer());
+ assertEquals(eventBuffer.isBuffer(), eventBuffer.readOnlySlice(1, 2).isBuffer());
+ }
+
+ @Test(expected = ReadOnlyBufferException.class)
+ public void testTagAsEventThrows1() {
+ buffer.readOnlySlice().tagAsEvent();
+ }
+
+ @Test(expected = ReadOnlyBufferException.class)
+ public void testTagAsEventThrows2() {
+ buffer.readOnlySlice(1, 2).tagAsEvent();
+ }
+
+ @Test
+ public void testForwardsGetMemorySegment() {
+ assertSame(buffer.getMemorySegment(), buffer.readOnlySlice().getMemorySegment());
+ assertSame(buffer.getMemorySegment(), buffer.readOnlySlice(1, 2).getMemorySegment());
+ }
+
+ @Test
+ public void testForwardsGetRecycler() {
+ assertSame(buffer.getRecycler(), buffer.readOnlySlice().getRecycler());
+ assertSame(buffer.getRecycler(), buffer.readOnlySlice(1, 2).getRecycler());
+ }
+
+ /**
+ * Tests forwarding of both {@link ReadOnlySlicedNetworkBuffer#recycleBuffer()} and
+ * {@link ReadOnlySlicedNetworkBuffer#isRecycled()}.
+ */
+ @Test
+ public void testForwardsRecycleBuffer1() {
+ ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice();
+ assertFalse(slice.isRecycled());
+ slice.recycleBuffer();
+ assertTrue(slice.isRecycled());
+ assertTrue(buffer.isRecycled());
+ }
+
+ /**
+ * Tests forwarding of both {@link ReadOnlySlicedNetworkBuffer#recycleBuffer()} and
+ * {@link ReadOnlySlicedNetworkBuffer#isRecycled()}.
+ */
+ @Test
+ public void testForwardsRecycleBuffer2() {
+ ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice(1, 2);
+ assertFalse(slice.isRecycled());
+ slice.recycleBuffer();
+ assertTrue(slice.isRecycled());
+ assertTrue(buffer.isRecycled());
+ }
+
+ /**
+ * Tests forwarding of both {@link ReadOnlySlicedNetworkBuffer#recycleBuffer()} and
+ * {@link ReadOnlySlicedNetworkBuffer#isRecycled()}.
+ */
+ @Test
+ public void testForwardsRetainBuffer1() {
+ ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice();
+ assertEquals(buffer.refCnt(), slice.refCnt());
+ slice.retainBuffer();
+ assertEquals(buffer.refCnt(), slice.refCnt());
+ }
+
+ /**
+ * Tests forwarding of both {@link ReadOnlySlicedNetworkBuffer#retainBuffer()} and
+ * {@link ReadOnlySlicedNetworkBuffer#isRecycled()}.
+ */
+ @Test
+ public void testForwardsRetainBuffer2() {
+ ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice(1, 2);
+ assertEquals(buffer.refCnt(), slice.refCnt());
+ slice.retainBuffer();
+ assertEquals(buffer.refCnt(), slice.refCnt());
+ }
+
+ @Test
+ public void testCreateSlice1() {
+ ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice();
+ ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice();
+ ByteBuf unwrap = slice2.unwrap();
+ assertSame(buffer, unwrap);
+ }
+
+ @Test
+ public void testCreateSlice2() {
+ ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice();
+ ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice(1, 2);
+ ByteBuf unwrap = slice2.unwrap();
+ assertSame(buffer, unwrap);
+ }
+
+ @Test
+ public void testCreateSlice3() {
+ ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice(1, 2);
+ ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice();
+ ByteBuf unwrap = slice2.unwrap();
+ assertSame(buffer, unwrap);
+ }
+
+ @Test
+ public void testCreateSlice4() {
+ ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice(1, 5);
+ ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice(1, 2);
+ ByteBuf unwrap = slice2.unwrap();
+ assertSame(buffer, unwrap);
+ }
+
+ @Test
+ public void testGetMaxCapacity() {
+ assertEquals(DATA_SIZE, buffer.readOnlySlice().getMaxCapacity());
+ assertEquals(2, buffer.readOnlySlice(1, 2).getMaxCapacity());
+ }
+
+ /**
+ * Tests the independence of the reader index via
+ * {@link ReadOnlySlicedNetworkBuffer#setReaderIndex(int)} and
+ * {@link ReadOnlySlicedNetworkBuffer#getReaderIndex()}.
+ */
+ @Test
+ public void testGetSetReaderIndex1() {
+ testGetSetReaderIndex(buffer.readOnlySlice());
+ }
+
+ /**
+ * Tests the independence of the reader index via
+ * {@link ReadOnlySlicedNetworkBuffer#setReaderIndex(int)} and
+ * {@link ReadOnlySlicedNetworkBuffer#getReaderIndex()}.
+ */
+ @Test
+ public void testGetSetReaderIndex2() {
+ testGetSetReaderIndex(buffer.readOnlySlice(1, 2));
+ }
+
+ private void testGetSetReaderIndex(ReadOnlySlicedNetworkBuffer slice) {
+ assertEquals(0, buffer.getReaderIndex());
+ assertEquals(0, slice.getReaderIndex());
+ slice.setReaderIndex(1);
+ assertEquals(0, buffer.getReaderIndex());
+ assertEquals(1, slice.getReaderIndex());
+ }
+
+ /**
+ * Tests the independence of the writer index via
+ * {@link ReadOnlySlicedNetworkBuffer#setSize(int)},
+ * {@link ReadOnlySlicedNetworkBuffer#getSize()}, and
+ * {@link ReadOnlySlicedNetworkBuffer#getSizeUnsafe()}.
+ */
+ @Test
+ public void testGetSetSize1() {
+ testGetSetSize(buffer.readOnlySlice(), DATA_SIZE);
+ }
+
+ /**
+ * Tests the independence of the writer index via
+ * {@link ReadOnlySlicedNetworkBuffer#setSize(int)},
+ * {@link ReadOnlySlicedNetworkBuffer#getSize()}, and
+ * {@link ReadOnlySlicedNetworkBuffer#getSizeUnsafe()}.
+ */
+ @Test
+ public void testGetSetSize2() {
+ testGetSetSize(buffer.readOnlySlice(1, 2), 2);
+ }
+
+ private void testGetSetSize(ReadOnlySlicedNetworkBuffer slice, int sliceSize) {
+ assertEquals(DATA_SIZE, buffer.getSize());
+ assertEquals(DATA_SIZE, buffer.getSizeUnsafe());
+ assertEquals(sliceSize, slice.getSize());
+ assertEquals(sliceSize, slice.getSizeUnsafe());
+ buffer.setSize(DATA_SIZE + 1);
+ assertEquals(DATA_SIZE + 1, buffer.getSize());
+ assertEquals(DATA_SIZE + 1, buffer.getSizeUnsafe());
+ assertEquals(sliceSize, slice.getSize());
+ assertEquals(sliceSize, slice.getSizeUnsafe());
+ }
+
+ @Test
+ public void testReadableBytes() {
+ assertEquals(buffer.readableBytes(), buffer.readOnlySlice().readableBytes());
+ assertEquals(2, buffer.readOnlySlice(1, 2).readableBytes());
+ }
+
+ @Test
+ public void testGetNioBufferReadable1() {
+ testGetNioBufferReadable(buffer.readOnlySlice(), DATA_SIZE);
+ }
+
+ @Test
+ public void testGetNioBufferReadable2() {
+ testGetNioBufferReadable(buffer.readOnlySlice(1, 2), 2);
+ }
+
+ private void testGetNioBufferReadable(ReadOnlySlicedNetworkBuffer slice, int sliceSize) {
+ ByteBuffer sliceByteBuffer = slice.getNioBufferReadable();
+ assertTrue(sliceByteBuffer.isReadOnly());
+ assertEquals(sliceSize, sliceByteBuffer.remaining());
+ assertEquals(sliceSize, sliceByteBuffer.limit());
+ assertEquals(sliceSize, sliceByteBuffer.capacity());
+
+ // modify sliceByteBuffer position and verify nothing has changed in the original buffer
+ sliceByteBuffer.position(1);
+ assertEquals(0, buffer.getReaderIndex());
+ assertEquals(0, slice.getReaderIndex());
+ assertEquals(DATA_SIZE, buffer.getSize());
+ assertEquals(sliceSize, slice.getSize());
+ }
+
+ @Test
+ public void testGetNioBuffer1() {
+ testGetNioBuffer(buffer.readOnlySlice(), DATA_SIZE);
+ }
+
+ @Test
+ public void testGetNioBuffer2() {
+ testGetNioBuffer(buffer.readOnlySlice(1, 2), 2);
+ }
+
+ private void testGetNioBuffer(ReadOnlySlicedNetworkBuffer slice, int sliceSize) {
+ ByteBuffer sliceByteBuffer = slice.getNioBuffer(1, 1);
+ assertTrue(sliceByteBuffer.isReadOnly());
+ assertEquals(1, sliceByteBuffer.remaining());
+ assertEquals(1, sliceByteBuffer.limit());
+ assertEquals(1, sliceByteBuffer.capacity());
+
+ // modify sliceByteBuffer position and verify nothing has changed in the original buffer
+ sliceByteBuffer.position(1);
+ assertEquals(0, buffer.getReaderIndex());
+ assertEquals(0, slice.getReaderIndex());
+ assertEquals(DATA_SIZE, buffer.getSize());
+ assertEquals(sliceSize, slice.getSize());
+ }
+
+ @Test
+ public void testGetNioBufferReadableThreadSafe1() {
+ BufferTest.testGetNioBufferReadableThreadSafe(buffer.readOnlySlice());
+ }
+
+ @Test
+ public void testGetNioBufferReadableThreadSafe2() {
+ BufferTest.testGetNioBufferReadableThreadSafe(buffer.readOnlySlice(1, 2));
+ }
+
+ @Test
+ public void testGetNioBufferThreadSafe1() {
+ BufferTest.testGetNioBufferThreadSafe(buffer.readOnlySlice(), DATA_SIZE);
+ }
+
+ @Test
+ public void testGetNioBufferThreadSafe2() {
+ BufferTest.testGetNioBufferThreadSafe(buffer.readOnlySlice(1, 2), 2);
+ }
+
+ @Test
+ public void testForwardsSetAllocator() {
+ testForwardsSetAllocator(buffer.readOnlySlice());
+ testForwardsSetAllocator(buffer.readOnlySlice(1, 2));
+ }
+
+ private void testForwardsSetAllocator(ReadOnlySlicedNetworkBuffer slice) {
+ NettyBufferPool allocator = new NettyBufferPool(1);
+ slice.setAllocator(allocator);
+ assertSame(buffer.alloc(), slice.alloc());
+ assertSame(allocator, slice.alloc());
+ }
+}