You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/06/13 07:52:18 UTC
[3/6] flink git commit: [hotfix][runtime-test] Rename BufferTest to
NetworkBufferTest
[hotfix][runtime-test] Rename BufferTest to NetworkBufferTest
BufferTest was not testing a Buffer interface but NetworkBuffer class.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf0052ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf0052ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf0052ee
Branch: refs/heads/master
Commit: cf0052eef6896893c3af1897743694865bb58b68
Parents: 8581f57
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Wed May 30 11:24:53 2018 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Wed Jun 13 09:51:44 2018 +0200
----------------------------------------------------------------------
.../runtime/io/network/buffer/BufferTest.java | 473 -------------------
.../io/network/buffer/NetworkBufferTest.java | 473 +++++++++++++++++++
.../buffer/ReadOnlySlicedBufferTest.java | 8 +-
3 files changed, 477 insertions(+), 477 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cf0052ee/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
deleted file mode 100644
index b5eb6ab..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.buffer;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for the {@link Buffer} class.
- */
-public class BufferTest extends AbstractByteBufTest {
-
- /**
- * Upper limit for the max size that is sufficient for all the tests.
- */
- private static final int MAX_CAPACITY_UPPER_BOUND = 64 * 1024 * 1024;
-
- private static final NettyBufferPool NETTY_BUFFER_POOL = new NettyBufferPool(1);
-
- @Override
- protected NetworkBuffer newBuffer(int length, int maxCapacity) {
- return newBuffer(length, maxCapacity, false);
- }
-
- /**
- * Creates a new buffer for testing.
- *
- * @param length
- * buffer capacity
- * @param maxCapacity
- * buffer maximum capacity (will be used for the underlying {@link MemorySegment})
- * @param isBuffer
- * whether the buffer should represent data (<tt>true</tt>) or an event (<tt>false</tt>)
- *
- * @return the buffer
- */
- private static NetworkBuffer newBuffer(int length, int maxCapacity, boolean isBuffer) {
- return newBuffer(length, maxCapacity, isBuffer, FreeingBufferRecycler.INSTANCE);
- }
-
- /**
- * Creates a new buffer for testing.
- *
- * @param length
- * buffer capacity
- * @param maxCapacity
- * buffer maximum capacity (will be used for the underlying {@link MemorySegment})
- * @param isBuffer
- * whether the buffer should represent data (<tt>true</tt>) or an event (<tt>false</tt>)
- * @param recycler
- * the buffer recycler to use
- *
- * @return the buffer
- */
- private static NetworkBuffer newBuffer(int length, int maxCapacity, boolean isBuffer, BufferRecycler recycler) {
- final MemorySegment segment =
- MemorySegmentFactory
- .allocateUnpooledSegment(Math.min(maxCapacity, MAX_CAPACITY_UPPER_BOUND));
-
- NetworkBuffer buffer = new NetworkBuffer(segment, recycler, isBuffer);
- buffer.capacity(length);
- buffer.setAllocator(NETTY_BUFFER_POOL);
-
- assertSame(ByteOrder.BIG_ENDIAN, buffer.order());
- assertEquals(0, buffer.readerIndex());
- assertEquals(0, buffer.writerIndex());
- return buffer;
- }
-
- @Test
- public void testDataBufferIsBuffer() {
- assertFalse(newBuffer(1024, 1024, false).isBuffer());
- }
-
- @Test
- public void testEventBufferIsBuffer() {
- assertFalse(newBuffer(1024, 1024, false).isBuffer());
- }
-
- @Test
- public void testDataBufferTagAsEvent() {
- testTagAsEvent(true);
- }
-
- @Test
- public void testEventBufferTagAsEvent() {
- testTagAsEvent(false);
- }
-
- private static void testTagAsEvent(boolean isBuffer) {
- NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
- buffer.tagAsEvent();
- assertFalse(buffer.isBuffer());
- }
-
- @Test
- public void testDataBufferGetMemorySegment() {
- testGetMemorySegment(true);
- }
-
- @Test
- public void testEventBufferGetMemorySegment() {
- testGetMemorySegment(false);
- }
-
- private static void testGetMemorySegment(boolean isBuffer) {
- final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
- NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, isBuffer);
- assertSame(segment, buffer.getMemorySegment());
- }
-
- @Test
- public void testDataBufferGetRecycler() {
- testGetRecycler(true);
- }
-
- @Test
- public void testEventBufferGetRecycler() {
- testGetRecycler(false);
- }
-
- private static void testGetRecycler(boolean isBuffer) {
- BufferRecycler recycler = MemorySegment::free;
-
- NetworkBuffer dataBuffer = newBuffer(1024, 1024, isBuffer, recycler);
- assertSame(recycler, dataBuffer.getRecycler());
- }
-
- @Test
- public void testDataBufferRecycleBuffer() {
- testRecycleBuffer(true);
- }
-
- @Test
- public void testEventBufferRecycleBuffer() {
- testRecycleBuffer(false);
- }
-
- /**
- * Tests that {@link NetworkBuffer#recycleBuffer()} and {@link NetworkBuffer#isRecycled()} are
- * coupled and are also consistent with {@link NetworkBuffer#refCnt()}.
- */
- private static void testRecycleBuffer(boolean isBuffer) {
- NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
- assertFalse(buffer.isRecycled());
- buffer.recycleBuffer();
- assertTrue(buffer.isRecycled());
- assertEquals(0, buffer.refCnt());
- }
-
- @Test
- public void testDataBufferRetainBuffer() {
- testRetainBuffer(true);
- }
-
- @Test
- public void testEventBufferRetainBuffer() {
- testRetainBuffer(false);
- }
-
- /**
- * Tests that {@link NetworkBuffer#retainBuffer()} and {@link NetworkBuffer#isRecycled()} are
- * coupled and are also consistent with {@link NetworkBuffer#refCnt()}.
- */
- private static void testRetainBuffer(boolean isBuffer) {
- NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
- assertFalse(buffer.isRecycled());
- buffer.retainBuffer();
- assertFalse(buffer.isRecycled());
- assertEquals(2, buffer.refCnt());
- }
-
- @Test
- public void testDataBufferCreateSlice1() {
- testCreateSlice1(true);
- }
-
- @Test
- public void testEventBufferCreateSlice1() {
- testCreateSlice1(false);
- }
-
- private static void testCreateSlice1(boolean isBuffer) {
- NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
- buffer.setSize(10); // fake some data
- ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice();
-
- assertEquals(0, slice.getReaderIndex());
- assertEquals(10, slice.getSize());
- assertEquals(10, slice.getSizeUnsafe());
- assertSame(buffer, slice.unwrap().unwrap());
-
- // slice indices should be independent:
- buffer.setSize(8);
- buffer.setReaderIndex(2);
- assertEquals(0, slice.getReaderIndex());
- assertEquals(10, slice.getSize());
- assertEquals(10, slice.getSizeUnsafe());
- }
-
- @Test
- public void testDataBufferCreateSlice2() {
- testCreateSlice2(true);
- }
-
- @Test
- public void testEventBufferCreateSlice2() {
- testCreateSlice2(false);
- }
-
- private static void testCreateSlice2(boolean isBuffer) {
- NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
- buffer.setSize(2); // fake some data
- ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice(1, 10);
-
- assertEquals(0, slice.getReaderIndex());
- assertEquals(10, slice.getSize());
- assertEquals(10, slice.getSizeUnsafe());
- assertSame(buffer, slice.unwrap().unwrap());
-
- // slice indices should be independent:
- buffer.setSize(8);
- buffer.setReaderIndex(2);
- assertEquals(0, slice.getReaderIndex());
- assertEquals(10, slice.getSize());
- assertEquals(10, slice.getSizeUnsafe());
- }
-
- @Test
- public void testDataBufferGetMaxCapacity() {
- testGetMaxCapacity(true);
- }
-
- @Test
- public void testEventBufferGetMaxCapacity() {
- testGetMaxCapacity(false);
- }
-
- private static void testGetMaxCapacity(boolean isBuffer) {
- NetworkBuffer buffer = newBuffer(100, 1024, isBuffer);
- assertEquals(1024, buffer.getMaxCapacity());
- MemorySegment segment = buffer.getMemorySegment();
- Assert.assertEquals(segment.size(), buffer.getMaxCapacity());
- Assert.assertEquals(segment.size(), buffer.maxCapacity());
- }
-
- @Test
- public void testDataBufferGetSetReaderIndex() {
- testGetSetReaderIndex(true);
- }
-
- @Test
- public void testEventBufferGetSetReaderIndex() {
- testGetSetReaderIndex(false);
- }
-
- /**
- * Tests that {@link NetworkBuffer#setReaderIndex(int)} and
- * {@link NetworkBuffer#getReaderIndex()} are consistent.
- */
- private static void testGetSetReaderIndex(boolean isBuffer) {
- NetworkBuffer buffer = newBuffer(100, 1024, isBuffer);
- assertEquals(0, buffer.getReaderIndex());
-
- // fake some data
- buffer.setSize(100);
- assertEquals(0, buffer.getReaderIndex());
- buffer.setReaderIndex(1);
- assertEquals(1, buffer.getReaderIndex());
- }
-
- @Test
- public void testDataBufferSetGetSize() {
- testSetGetSize(true);
- }
-
- @Test
- public void testEventBufferSetGetSize() {
- testSetGetSize(false);
- }
-
- private static void testSetGetSize(boolean isBuffer) {
- NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
-
- assertEquals(0, buffer.getSize()); // initially 0
- assertEquals(0, buffer.getSizeUnsafe());
- assertEquals(buffer.writerIndex(), buffer.getSize());
- assertEquals(0, buffer.readerIndex()); // initially 0
-
- buffer.setSize(10);
- assertEquals(10, buffer.getSize());
- assertEquals(10, buffer.getSizeUnsafe());
- assertEquals(buffer.writerIndex(), buffer.getSize());
- assertEquals(0, buffer.readerIndex()); // independent
- }
-
- @Test
- public void testDataBufferReadableBytes() {
- testReadableBytes(true);
- }
-
- @Test
- public void testEventBufferReadableBytes() {
- testReadableBytes(false);
- }
-
- private static void testReadableBytes(boolean isBuffer) {
- NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
-
- assertEquals(0, buffer.readableBytes());
- buffer.setSize(10);
- assertEquals(10, buffer.readableBytes());
- buffer.setReaderIndex(2);
- assertEquals(8, buffer.readableBytes());
- buffer.setReaderIndex(10);
- assertEquals(0, buffer.readableBytes());
- }
-
- @Test
- public void testDataBufferGetNioBufferReadable() {
- testGetNioBufferReadable(true);
- }
-
- @Test
- public void testEventBufferGetNioBufferReadable() {
- testGetNioBufferReadable(false);
- }
-
- private void testGetNioBufferReadable(boolean isBuffer) {
- NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
-
- ByteBuffer byteBuffer = buffer.getNioBufferReadable();
- assertFalse(byteBuffer.isReadOnly());
- assertEquals(0, byteBuffer.remaining());
- assertEquals(0, byteBuffer.limit());
- assertEquals(0, byteBuffer.capacity());
-
- // add some data
- buffer.setSize(10);
- // nothing changes in the byteBuffer
- assertEquals(0, byteBuffer.remaining());
- assertEquals(0, byteBuffer.limit());
- assertEquals(0, byteBuffer.capacity());
- // get a new byteBuffer (should have updated indices)
- byteBuffer = buffer.getNioBufferReadable();
- assertFalse(byteBuffer.isReadOnly());
- assertEquals(10, byteBuffer.remaining());
- assertEquals(10, byteBuffer.limit());
- assertEquals(10, byteBuffer.capacity());
-
- // modify byteBuffer position and verify nothing has changed in the original buffer
- byteBuffer.position(1);
- assertEquals(0, buffer.getReaderIndex());
- assertEquals(10, buffer.getSize());
- }
-
- @Test
- public void testGetNioBufferReadableThreadSafe() {
- NetworkBuffer buffer = newBuffer(1024, 1024);
- testGetNioBufferReadableThreadSafe(buffer);
- }
-
- static void testGetNioBufferReadableThreadSafe(Buffer buffer) {
- ByteBuffer buf1 = buffer.getNioBufferReadable();
- ByteBuffer buf2 = buffer.getNioBufferReadable();
-
- assertNotNull(buf1);
- assertNotNull(buf2);
-
- assertTrue("Repeated call to getNioBuffer() returns the same nio buffer", buf1 != buf2);
- }
-
- @Test
- public void testDataBufferGetNioBuffer() {
- testGetNioBuffer(true);
- }
-
- @Test
- public void testEventBufferGetNioBuffer() {
- testGetNioBuffer(false);
- }
-
- private void testGetNioBuffer(boolean isBuffer) {
- NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
-
- ByteBuffer byteBuffer = buffer.getNioBuffer(1, 1);
- assertFalse(byteBuffer.isReadOnly());
- assertEquals(1, byteBuffer.remaining());
- assertEquals(1, byteBuffer.limit());
- assertEquals(1, byteBuffer.capacity());
-
- // add some data
- buffer.setSize(10);
- // nothing changes in the byteBuffer
- assertEquals(1, byteBuffer.remaining());
- assertEquals(1, byteBuffer.limit());
- assertEquals(1, byteBuffer.capacity());
- // get a new byteBuffer (should have updated indices)
- byteBuffer = buffer.getNioBuffer(1, 2);
- assertFalse(byteBuffer.isReadOnly());
- assertEquals(2, byteBuffer.remaining());
- assertEquals(2, byteBuffer.limit());
- assertEquals(2, byteBuffer.capacity());
-
- // modify byteBuffer position and verify nothing has changed in the original buffer
- byteBuffer.position(1);
- assertEquals(0, buffer.getReaderIndex());
- assertEquals(10, buffer.getSize());
- }
-
- @Test
- public void testGetNioBufferThreadSafe() {
- NetworkBuffer buffer = newBuffer(1024, 1024);
- testGetNioBufferThreadSafe(buffer, 10);
- }
-
- static void testGetNioBufferThreadSafe(Buffer buffer, int length) {
- ByteBuffer buf1 = buffer.getNioBuffer(0, length);
- ByteBuffer buf2 = buffer.getNioBuffer(0, length);
-
- assertNotNull(buf1);
- assertNotNull(buf2);
-
- assertTrue("Repeated call to getNioBuffer(int, int) returns the same nio buffer", buf1 != buf2);
- }
-
- @Test
- public void testDataBufferSetAllocator() {
- testSetAllocator(true);
- }
-
- @Test
- public void testEventBufferSetAllocator() {
- testSetAllocator(false);
- }
-
- private void testSetAllocator(boolean isBuffer) {
- NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
- NettyBufferPool allocator = new NettyBufferPool(1);
-
- buffer.setAllocator(allocator);
- assertSame(allocator, buffer.alloc());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf0052ee/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferTest.java
new file mode 100644
index 0000000..47615d9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferTest.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link NetworkBuffer} class.
+ */
+public class NetworkBufferTest extends AbstractByteBufTest {
+
+ /**
+ * Upper limit for the max size that is sufficient for all the tests.
+ */
+ private static final int MAX_CAPACITY_UPPER_BOUND = 64 * 1024 * 1024;
+
+ private static final NettyBufferPool NETTY_BUFFER_POOL = new NettyBufferPool(1);
+
+ @Override
+ protected NetworkBuffer newBuffer(int length, int maxCapacity) {
+ return newBuffer(length, maxCapacity, false);
+ }
+
+ /**
+ * Creates a new buffer for testing.
+ *
+ * @param length
+ * buffer capacity
+ * @param maxCapacity
+ * buffer maximum capacity (will be used for the underlying {@link MemorySegment})
+ * @param isBuffer
+ * whether the buffer should represent data (<tt>true</tt>) or an event (<tt>false</tt>)
+ *
+ * @return the buffer
+ */
+ private static NetworkBuffer newBuffer(int length, int maxCapacity, boolean isBuffer) {
+ return newBuffer(length, maxCapacity, isBuffer, FreeingBufferRecycler.INSTANCE);
+ }
+
+ /**
+ * Creates a new buffer for testing.
+ *
+ * @param length
+ * buffer capacity
+ * @param maxCapacity
+ * buffer maximum capacity (will be used for the underlying {@link MemorySegment})
+ * @param isBuffer
+ * whether the buffer should represent data (<tt>true</tt>) or an event (<tt>false</tt>)
+ * @param recycler
+ * the buffer recycler to use
+ *
+ * @return the buffer
+ */
+ private static NetworkBuffer newBuffer(int length, int maxCapacity, boolean isBuffer, BufferRecycler recycler) {
+ final MemorySegment segment =
+ MemorySegmentFactory
+ .allocateUnpooledSegment(Math.min(maxCapacity, MAX_CAPACITY_UPPER_BOUND));
+
+ NetworkBuffer buffer = new NetworkBuffer(segment, recycler, isBuffer);
+ buffer.capacity(length);
+ buffer.setAllocator(NETTY_BUFFER_POOL);
+
+ assertSame(ByteOrder.BIG_ENDIAN, buffer.order());
+ assertEquals(0, buffer.readerIndex());
+ assertEquals(0, buffer.writerIndex());
+ return buffer;
+ }
+
+ @Test
+ public void testDataBufferIsBuffer() {
+ assertFalse(newBuffer(1024, 1024, false).isBuffer());
+ }
+
+ @Test
+ public void testEventBufferIsBuffer() {
+ assertFalse(newBuffer(1024, 1024, false).isBuffer());
+ }
+
+ @Test
+ public void testDataBufferTagAsEvent() {
+ testTagAsEvent(true);
+ }
+
+ @Test
+ public void testEventBufferTagAsEvent() {
+ testTagAsEvent(false);
+ }
+
+ private static void testTagAsEvent(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+ buffer.tagAsEvent();
+ assertFalse(buffer.isBuffer());
+ }
+
+ @Test
+ public void testDataBufferGetMemorySegment() {
+ testGetMemorySegment(true);
+ }
+
+ @Test
+ public void testEventBufferGetMemorySegment() {
+ testGetMemorySegment(false);
+ }
+
+ private static void testGetMemorySegment(boolean isBuffer) {
+ final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
+ NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, isBuffer);
+ assertSame(segment, buffer.getMemorySegment());
+ }
+
+ @Test
+ public void testDataBufferGetRecycler() {
+ testGetRecycler(true);
+ }
+
+ @Test
+ public void testEventBufferGetRecycler() {
+ testGetRecycler(false);
+ }
+
+ private static void testGetRecycler(boolean isBuffer) {
+ BufferRecycler recycler = MemorySegment::free;
+
+ NetworkBuffer dataBuffer = newBuffer(1024, 1024, isBuffer, recycler);
+ assertSame(recycler, dataBuffer.getRecycler());
+ }
+
+ @Test
+ public void testDataBufferRecycleBuffer() {
+ testRecycleBuffer(true);
+ }
+
+ @Test
+ public void testEventBufferRecycleBuffer() {
+ testRecycleBuffer(false);
+ }
+
+ /**
+ * Tests that {@link NetworkBuffer#recycleBuffer()} and {@link NetworkBuffer#isRecycled()} are
+ * coupled and are also consistent with {@link NetworkBuffer#refCnt()}.
+ */
+ private static void testRecycleBuffer(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+ assertFalse(buffer.isRecycled());
+ buffer.recycleBuffer();
+ assertTrue(buffer.isRecycled());
+ assertEquals(0, buffer.refCnt());
+ }
+
+ @Test
+ public void testDataBufferRetainBuffer() {
+ testRetainBuffer(true);
+ }
+
+ @Test
+ public void testEventBufferRetainBuffer() {
+ testRetainBuffer(false);
+ }
+
+ /**
+ * Tests that {@link NetworkBuffer#retainBuffer()} and {@link NetworkBuffer#isRecycled()} are
+ * coupled and are also consistent with {@link NetworkBuffer#refCnt()}.
+ */
+ private static void testRetainBuffer(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+ assertFalse(buffer.isRecycled());
+ buffer.retainBuffer();
+ assertFalse(buffer.isRecycled());
+ assertEquals(2, buffer.refCnt());
+ }
+
+ @Test
+ public void testDataBufferCreateSlice1() {
+ testCreateSlice1(true);
+ }
+
+ @Test
+ public void testEventBufferCreateSlice1() {
+ testCreateSlice1(false);
+ }
+
+ private static void testCreateSlice1(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+ buffer.setSize(10); // fake some data
+ ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice();
+
+ assertEquals(0, slice.getReaderIndex());
+ assertEquals(10, slice.getSize());
+ assertEquals(10, slice.getSizeUnsafe());
+ assertSame(buffer, slice.unwrap().unwrap());
+
+ // slice indices should be independent:
+ buffer.setSize(8);
+ buffer.setReaderIndex(2);
+ assertEquals(0, slice.getReaderIndex());
+ assertEquals(10, slice.getSize());
+ assertEquals(10, slice.getSizeUnsafe());
+ }
+
+ @Test
+ public void testDataBufferCreateSlice2() {
+ testCreateSlice2(true);
+ }
+
+ @Test
+ public void testEventBufferCreateSlice2() {
+ testCreateSlice2(false);
+ }
+
+ private static void testCreateSlice2(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+ buffer.setSize(2); // fake some data
+ ReadOnlySlicedNetworkBuffer slice = buffer.readOnlySlice(1, 10);
+
+ assertEquals(0, slice.getReaderIndex());
+ assertEquals(10, slice.getSize());
+ assertEquals(10, slice.getSizeUnsafe());
+ assertSame(buffer, slice.unwrap().unwrap());
+
+ // slice indices should be independent:
+ buffer.setSize(8);
+ buffer.setReaderIndex(2);
+ assertEquals(0, slice.getReaderIndex());
+ assertEquals(10, slice.getSize());
+ assertEquals(10, slice.getSizeUnsafe());
+ }
+
+ @Test
+ public void testDataBufferGetMaxCapacity() {
+ testGetMaxCapacity(true);
+ }
+
+ @Test
+ public void testEventBufferGetMaxCapacity() {
+ testGetMaxCapacity(false);
+ }
+
+ private static void testGetMaxCapacity(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(100, 1024, isBuffer);
+ assertEquals(1024, buffer.getMaxCapacity());
+ MemorySegment segment = buffer.getMemorySegment();
+ Assert.assertEquals(segment.size(), buffer.getMaxCapacity());
+ Assert.assertEquals(segment.size(), buffer.maxCapacity());
+ }
+
+ @Test
+ public void testDataBufferGetSetReaderIndex() {
+ testGetSetReaderIndex(true);
+ }
+
+ @Test
+ public void testEventBufferGetSetReaderIndex() {
+ testGetSetReaderIndex(false);
+ }
+
+ /**
+ * Tests that {@link NetworkBuffer#setReaderIndex(int)} and
+ * {@link NetworkBuffer#getReaderIndex()} are consistent.
+ */
+ private static void testGetSetReaderIndex(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(100, 1024, isBuffer);
+ assertEquals(0, buffer.getReaderIndex());
+
+ // fake some data
+ buffer.setSize(100);
+ assertEquals(0, buffer.getReaderIndex());
+ buffer.setReaderIndex(1);
+ assertEquals(1, buffer.getReaderIndex());
+ }
+
+ @Test
+ public void testDataBufferSetGetSize() {
+ testSetGetSize(true);
+ }
+
+ @Test
+ public void testEventBufferSetGetSize() {
+ testSetGetSize(false);
+ }
+
+ private static void testSetGetSize(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+
+ assertEquals(0, buffer.getSize()); // initially 0
+ assertEquals(0, buffer.getSizeUnsafe());
+ assertEquals(buffer.writerIndex(), buffer.getSize());
+ assertEquals(0, buffer.readerIndex()); // initially 0
+
+ buffer.setSize(10);
+ assertEquals(10, buffer.getSize());
+ assertEquals(10, buffer.getSizeUnsafe());
+ assertEquals(buffer.writerIndex(), buffer.getSize());
+ assertEquals(0, buffer.readerIndex()); // independent
+ }
+
+ @Test
+ public void testDataBufferReadableBytes() {
+ testReadableBytes(true);
+ }
+
+ @Test
+ public void testEventBufferReadableBytes() {
+ testReadableBytes(false);
+ }
+
+ private static void testReadableBytes(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+
+ assertEquals(0, buffer.readableBytes());
+ buffer.setSize(10);
+ assertEquals(10, buffer.readableBytes());
+ buffer.setReaderIndex(2);
+ assertEquals(8, buffer.readableBytes());
+ buffer.setReaderIndex(10);
+ assertEquals(0, buffer.readableBytes());
+ }
+
+ @Test
+ public void testDataBufferGetNioBufferReadable() {
+ testGetNioBufferReadable(true);
+ }
+
+ @Test
+ public void testEventBufferGetNioBufferReadable() {
+ testGetNioBufferReadable(false);
+ }
+
+ private void testGetNioBufferReadable(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+
+ ByteBuffer byteBuffer = buffer.getNioBufferReadable();
+ assertFalse(byteBuffer.isReadOnly());
+ assertEquals(0, byteBuffer.remaining());
+ assertEquals(0, byteBuffer.limit());
+ assertEquals(0, byteBuffer.capacity());
+
+ // add some data
+ buffer.setSize(10);
+ // nothing changes in the byteBuffer
+ assertEquals(0, byteBuffer.remaining());
+ assertEquals(0, byteBuffer.limit());
+ assertEquals(0, byteBuffer.capacity());
+ // get a new byteBuffer (should have updated indices)
+ byteBuffer = buffer.getNioBufferReadable();
+ assertFalse(byteBuffer.isReadOnly());
+ assertEquals(10, byteBuffer.remaining());
+ assertEquals(10, byteBuffer.limit());
+ assertEquals(10, byteBuffer.capacity());
+
+ // modify byteBuffer position and verify nothing has changed in the original buffer
+ byteBuffer.position(1);
+ assertEquals(0, buffer.getReaderIndex());
+ assertEquals(10, buffer.getSize());
+ }
+
+ @Test
+ public void testGetNioBufferReadableThreadSafe() {
+ NetworkBuffer buffer = newBuffer(1024, 1024);
+ testGetNioBufferReadableThreadSafe(buffer);
+ }
+
+ static void testGetNioBufferReadableThreadSafe(Buffer buffer) {
+ ByteBuffer buf1 = buffer.getNioBufferReadable();
+ ByteBuffer buf2 = buffer.getNioBufferReadable();
+
+ assertNotNull(buf1);
+ assertNotNull(buf2);
+
+ assertTrue("Repeated call to getNioBuffer() returns the same nio buffer", buf1 != buf2);
+ }
+
+ @Test
+ public void testDataBufferGetNioBuffer() {
+ testGetNioBuffer(true);
+ }
+
+ @Test
+ public void testEventBufferGetNioBuffer() {
+ testGetNioBuffer(false);
+ }
+
+ private void testGetNioBuffer(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+
+ ByteBuffer byteBuffer = buffer.getNioBuffer(1, 1);
+ assertFalse(byteBuffer.isReadOnly());
+ assertEquals(1, byteBuffer.remaining());
+ assertEquals(1, byteBuffer.limit());
+ assertEquals(1, byteBuffer.capacity());
+
+ // add some data
+ buffer.setSize(10);
+ // nothing changes in the byteBuffer
+ assertEquals(1, byteBuffer.remaining());
+ assertEquals(1, byteBuffer.limit());
+ assertEquals(1, byteBuffer.capacity());
+ // get a new byteBuffer (should have updated indices)
+ byteBuffer = buffer.getNioBuffer(1, 2);
+ assertFalse(byteBuffer.isReadOnly());
+ assertEquals(2, byteBuffer.remaining());
+ assertEquals(2, byteBuffer.limit());
+ assertEquals(2, byteBuffer.capacity());
+
+ // modify byteBuffer position and verify nothing has changed in the original buffer
+ byteBuffer.position(1);
+ assertEquals(0, buffer.getReaderIndex());
+ assertEquals(10, buffer.getSize());
+ }
+
+ @Test
+ public void testGetNioBufferThreadSafe() {
+ NetworkBuffer buffer = newBuffer(1024, 1024);
+ testGetNioBufferThreadSafe(buffer, 10);
+ }
+
+ static void testGetNioBufferThreadSafe(Buffer buffer, int length) {
+ ByteBuffer buf1 = buffer.getNioBuffer(0, length);
+ ByteBuffer buf2 = buffer.getNioBuffer(0, length);
+
+ assertNotNull(buf1);
+ assertNotNull(buf2);
+
+ assertTrue("Repeated call to getNioBuffer(int, int) returns the same nio buffer", buf1 != buf2);
+ }
+
+ @Test
+ public void testDataBufferSetAllocator() {
+ testSetAllocator(true);
+ }
+
+ @Test
+ public void testEventBufferSetAllocator() {
+ testSetAllocator(false);
+ }
+
+ private void testSetAllocator(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+ NettyBufferPool allocator = new NettyBufferPool(1);
+
+ buffer.setAllocator(allocator);
+ assertSame(allocator, buffer.alloc());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/cf0052ee/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
index 9f9d575..d5814ee 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
@@ -322,22 +322,22 @@ public class ReadOnlySlicedBufferTest {
@Test
public void testGetNioBufferReadableThreadSafe1() {
- BufferTest.testGetNioBufferReadableThreadSafe(buffer.readOnlySlice());
+ NetworkBufferTest.testGetNioBufferReadableThreadSafe(buffer.readOnlySlice());
}
@Test
public void testGetNioBufferReadableThreadSafe2() {
- BufferTest.testGetNioBufferReadableThreadSafe(buffer.readOnlySlice(1, 2));
+ NetworkBufferTest.testGetNioBufferReadableThreadSafe(buffer.readOnlySlice(1, 2));
}
@Test
public void testGetNioBufferThreadSafe1() {
- BufferTest.testGetNioBufferThreadSafe(buffer.readOnlySlice(), DATA_SIZE);
+ NetworkBufferTest.testGetNioBufferThreadSafe(buffer.readOnlySlice(), DATA_SIZE);
}
@Test
public void testGetNioBufferThreadSafe2() {
- BufferTest.testGetNioBufferThreadSafe(buffer.readOnlySlice(1, 2), 2);
+ NetworkBufferTest.testGetNioBufferThreadSafe(buffer.readOnlySlice(1, 2), 2);
}
@Test