You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/18 14:25:55 UTC
[4/9] flink git commit: [FLINK-7520][network] let our Buffer class
extend from netty's buffer class
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
index 87aab01..d0df02d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
@@ -105,7 +105,7 @@ public class BufferBuilderTest {
}
private static void assertBufferContent(Buffer actualBuffer, int... expected) {
- assertEquals(toByteBuffer(expected), actualBuffer.getNioBuffer());
+ assertEquals(toByteBuffer(expected), actualBuffer.getNioBufferReadable());
}
private static BufferBuilder createBufferBuilder() {
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
index 57c8077..f59fa9f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
@@ -20,54 +20,392 @@ package org.apache.flink.runtime.io.network.buffer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
-public class BufferTest {
+/**
+ * Tests for the {@link Buffer} class.
+ */
+public class BufferTest extends AbstractByteBufTest {
+
+ /**
+ * Upper limit for the max size that is sufficient for all the tests.
+ */
+ private static final int MAX_CAPACITY_UPPER_BOUND = 64 * 1024 * 1024;
+
+ private static final NettyBufferPool NETTY_BUFFER_POOL = new NettyBufferPool(1);
+
+ @Override
+ protected NetworkBuffer newBuffer(int length, int maxCapacity) {
+ return newBuffer(length, maxCapacity, false);
+ }
+
+ /**
+ * Creates a new buffer for testing.
+ *
+ * @param length
+ * buffer capacity
+ * @param maxCapacity
+ * buffer maximum capacity (will be used for the underlying {@link MemorySegment})
+ * @param isBuffer
+ * whether the buffer should represent data (<tt>true</tt>) or an event (<tt>false</tt>)
+ *
+ * @return the buffer
+ */
+ private static NetworkBuffer newBuffer(int length, int maxCapacity, boolean isBuffer) {
+ return newBuffer(length, maxCapacity, isBuffer, FreeingBufferRecycler.INSTANCE);
+ }
+
+ /**
+ * Creates a new buffer for testing.
+ *
+ * @param length
+ * buffer capacity
+ * @param maxCapacity
+ * buffer maximum capacity (will be used for the underlying {@link MemorySegment})
+ * @param isBuffer
+ * whether the buffer should represent data (<tt>true</tt>) or an event (<tt>false</tt>)
+ * @param recycler
+ * the buffer recycler to use
+ *
+ * @return the buffer
+ */
+ private static NetworkBuffer newBuffer(int length, int maxCapacity, boolean isBuffer, BufferRecycler recycler) {
+ final MemorySegment segment =
+ MemorySegmentFactory
+ .allocateUnpooledSegment(Math.min(maxCapacity, MAX_CAPACITY_UPPER_BOUND));
+
+ NetworkBuffer buffer = new NetworkBuffer(segment, recycler, isBuffer);
+ buffer.capacity(length);
+ buffer.setAllocator(NETTY_BUFFER_POOL);
+
+ assertSame(ByteOrder.BIG_ENDIAN, buffer.order());
+ assertEquals(0, buffer.readerIndex());
+ assertEquals(0, buffer.writerIndex());
+ return buffer;
+ }
@Test
- public void testSetGetSize() {
- final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
+ public void testDataBufferIsBuffer() {
+ assertFalse(newBuffer(1024, 1024, false).isBuffer());
+ }
+
+ @Test
+ public void testEventBufferIsBuffer() {
+ assertFalse(newBuffer(1024, 1024, false).isBuffer());
+ }
- Buffer buffer = new Buffer(segment, FreeingBufferRecycler.INSTANCE);
- Assert.assertEquals(segment.size(), buffer.getSize());
+ @Test
+ public void testDataBufferTagAsEvent() {
+ testTagAsEvent(true);
+ }
- buffer.setSize(segment.size() / 2);
- Assert.assertEquals(segment.size() / 2, buffer.getSize());
+ @Test
+ public void testEventBufferTagAsEvent() {
+ testTagAsEvent(false);
+ }
- try {
- buffer.setSize(-1);
- Assert.fail("Didn't throw expected exception");
- } catch (IllegalArgumentException e) {
- // OK => expected exception
- }
+ private static void testTagAsEvent(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+ buffer.tagAsEvent();
+ assertFalse(buffer.isBuffer());
+ }
- try {
- buffer.setSize(segment.size() + 1);
- Assert.fail("Didn't throw expected exception");
- } catch (IllegalArgumentException e) {
- // OK => expected exception
- }
+ @Test
+ public void testDataBufferGetMemorySegment() {
+ testGetMemorySegment(true);
}
@Test
- public void testgetNioBufferThreadSafe() {
+ public void testEventBufferGetMemorySegment() {
+ testGetMemorySegment(false);
+ }
+
+ private static void testGetMemorySegment(boolean isBuffer) {
final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(1024);
+ NetworkBuffer buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, isBuffer);
+ assertSame(segment, buffer.getMemorySegment());
+ }
+
+ @Test
+ public void testDataBufferGetRecycler() {
+ testGetRecycler(true);
+ }
+
+ @Test
+ public void testEventBufferGetRecycler() {
+ testGetRecycler(false);
+ }
+
+ private static void testGetRecycler(boolean isBuffer) {
+ BufferRecycler recycler = MemorySegment::free;
+
+ NetworkBuffer dataBuffer = newBuffer(1024, 1024, isBuffer, recycler);
+ assertSame(recycler, dataBuffer.getRecycler());
+ }
+
+ @Test
+ public void testDataBufferRecycleBuffer() {
+ testRecycleBuffer(true);
+ }
+
+ @Test
+ public void testEventBufferRecycleBuffer() {
+ testRecycleBuffer(false);
+ }
+
+ /**
+ * Tests that {@link NetworkBuffer#recycle()} and {@link NetworkBuffer#isRecycled()} are coupled
+ * and are also consistent with {@link NetworkBuffer#refCnt()}.
+ */
+ private static void testRecycleBuffer(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+ assertFalse(buffer.isRecycled());
+ buffer.recycle();
+ assertTrue(buffer.isRecycled());
+ assertEquals(0, buffer.refCnt());
+ }
+
+ @Test
+ public void testDataBufferRetainBuffer() {
+ testRetainBuffer(true);
+ }
+
+ @Test
+ public void testEventBufferRetainBuffer() {
+ testRetainBuffer(false);
+ }
+
+ /**
+ * Tests that {@link NetworkBuffer#retain()} and {@link NetworkBuffer#isRecycled()} are coupled
+ * and are also consistent with {@link NetworkBuffer#refCnt()}.
+ */
+ private static void testRetainBuffer(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+ assertFalse(buffer.isRecycled());
+ buffer.retain();
+ assertFalse(buffer.isRecycled());
+ assertEquals(2, buffer.refCnt());
+ }
+
+ @Test
+ public void testDataBufferGetMaxCapacity() {
+ testGetMaxCapacity(true);
+ }
+
+ @Test
+ public void testEventBufferGetMaxCapacity() {
+ testGetMaxCapacity(false);
+ }
+
+ private static void testGetMaxCapacity(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(100, 1024, isBuffer);
+ assertEquals(1024, buffer.getMaxCapacity());
+ MemorySegment segment = buffer.getMemorySegment();
+ Assert.assertEquals(segment.size(), buffer.getMaxCapacity());
+ Assert.assertEquals(segment.size(), buffer.maxCapacity());
+ }
+
+ @Test
+ public void testDataBufferGetSetReaderIndex() {
+ testGetSetReaderIndex(true);
+ }
+
+ @Test
+ public void testEventBufferGetSetReaderIndex() {
+ testGetSetReaderIndex(false);
+ }
+
+ /**
+ * Tests that {@link NetworkBuffer#setReaderIndex(int)} and
+ * {@link NetworkBuffer#getReaderIndex()} are consistent.
+ */
+ private static void testGetSetReaderIndex(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(100, 1024, isBuffer);
+ assertEquals(0, buffer.getReaderIndex());
+
+ // fake some data
+ buffer.setSize(100);
+ assertEquals(0, buffer.getReaderIndex());
+ buffer.setReaderIndex(1);
+ assertEquals(1, buffer.getReaderIndex());
+ }
+
+ @Test
+ public void testDataBufferSetGetSize() {
+ testSetGetSize(true);
+ }
+
+ @Test
+ public void testEventBufferSetGetSize() {
+ testSetGetSize(false);
+ }
+
+ private static void testSetGetSize(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+
+ assertEquals(0, buffer.getSize()); // initially 0
+ assertEquals(0, buffer.getSizeUnsafe());
+ assertEquals(buffer.writerIndex(), buffer.getSize());
+ assertEquals(0, buffer.readerIndex()); // initially 0
+
+ buffer.setSize(10);
+ assertEquals(10, buffer.getSize());
+ assertEquals(10, buffer.getSizeUnsafe());
+ assertEquals(buffer.writerIndex(), buffer.getSize());
+ assertEquals(0, buffer.readerIndex()); // independent
+ }
+
+ @Test
+ public void testDataBufferReadableBytes() {
+ testReadableBytes(true);
+ }
+
+ @Test
+ public void testEventBufferReadableBytes() {
+ testReadableBytes(false);
+ }
+
+ private static void testReadableBytes(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+
+ assertEquals(0, buffer.readableBytes());
+ buffer.setSize(10);
+ assertEquals(10, buffer.readableBytes());
+ buffer.setReaderIndex(2);
+ assertEquals(8, buffer.readableBytes());
+ buffer.setReaderIndex(10);
+ assertEquals(0, buffer.readableBytes());
+ }
+
+ @Test
+ public void testDataBufferGetNioBufferReadable() {
+ testGetNioBufferReadable(true);
+ }
+
+ @Test
+ public void testEventBufferGetNioBufferReadable() {
+ testGetNioBufferReadable(false);
+ }
+
+ private void testGetNioBufferReadable(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
- Buffer buffer = new Buffer(segment, FreeingBufferRecycler.INSTANCE);
+ ByteBuffer byteBuffer = buffer.getNioBufferReadable();
+ assertFalse(byteBuffer.isReadOnly());
+ assertEquals(0, byteBuffer.remaining());
+ assertEquals(0, byteBuffer.limit());
+ assertEquals(0, byteBuffer.capacity());
- ByteBuffer buf1 = buffer.getNioBuffer();
- ByteBuffer buf2 = buffer.getNioBuffer();
+ // add some data
+ buffer.setSize(10);
+ // nothing changes in the byteBuffer
+ assertEquals(0, byteBuffer.remaining());
+ assertEquals(0, byteBuffer.limit());
+ assertEquals(0, byteBuffer.capacity());
+ // get a new byteBuffer (should have updated indices)
+ byteBuffer = buffer.getNioBufferReadable();
+ assertFalse(byteBuffer.isReadOnly());
+ assertEquals(10, byteBuffer.remaining());
+ assertEquals(10, byteBuffer.limit());
+ assertEquals(10, byteBuffer.capacity());
+
+ // modify byteBuffer position and verify nothing has changed in the original buffer
+ byteBuffer.position(1);
+ assertEquals(0, buffer.getReaderIndex());
+ assertEquals(10, buffer.getSize());
+ }
+
+ @Test
+ public void testGetNioBufferReadableThreadSafe() {
+ NetworkBuffer buffer = newBuffer(1024, 1024);
+
+ ByteBuffer buf1 = buffer.getNioBufferReadable();
+ ByteBuffer buf2 = buffer.getNioBufferReadable();
assertNotNull(buf1);
assertNotNull(buf2);
assertTrue("Repeated call to getNioBuffer() returns the same nio buffer", buf1 != buf2);
}
+
+ @Test
+ public void testDataBufferGetNioBuffer() {
+ testGetNioBuffer(true);
+ }
+
+ @Test
+ public void testEventBufferGetNioBuffer() {
+ testGetNioBuffer(false);
+ }
+
+ private void testGetNioBuffer(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+
+ ByteBuffer byteBuffer = buffer.getNioBuffer(1, 1);
+ assertFalse(byteBuffer.isReadOnly());
+ assertEquals(1, byteBuffer.remaining());
+ assertEquals(1, byteBuffer.limit());
+ assertEquals(1, byteBuffer.capacity());
+
+ // add some data
+ buffer.setSize(10);
+ // nothing changes in the byteBuffer
+ assertEquals(1, byteBuffer.remaining());
+ assertEquals(1, byteBuffer.limit());
+ assertEquals(1, byteBuffer.capacity());
+ // get a new byteBuffer (should have updated indices)
+ byteBuffer = buffer.getNioBuffer(1, 2);
+ assertFalse(byteBuffer.isReadOnly());
+ assertEquals(2, byteBuffer.remaining());
+ assertEquals(2, byteBuffer.limit());
+ assertEquals(2, byteBuffer.capacity());
+
+ // modify byteBuffer position and verify nothing has changed in the original buffer
+ byteBuffer.position(1);
+ assertEquals(0, buffer.getReaderIndex());
+ assertEquals(10, buffer.getSize());
+ }
+
+ @Test
+ public void testGetNioBufferThreadSafe() {
+ NetworkBuffer buffer = newBuffer(1024, 1024);
+
+ ByteBuffer buf1 = buffer.getNioBuffer(0, 10);
+ ByteBuffer buf2 = buffer.getNioBuffer(0, 10);
+
+ assertNotNull(buf1);
+ assertNotNull(buf2);
+
+ assertTrue("Repeated call to getNioBuffer(int, int) returns the same nio buffer", buf1 != buf2);
+ }
+
+ @Test
+ public void testDataBufferSetAllocator() {
+ testSetAllocator(true);
+ }
+
+ @Test
+ public void testEventBufferSetAllocator() {
+ testSetAllocator(false);
+ }
+
+ private void testSetAllocator(boolean isBuffer) {
+ NetworkBuffer buffer = newBuffer(1024, 1024, isBuffer);
+ NettyBufferPool allocator = new NettyBufferPool(1);
+
+ buffer.setAllocator(allocator);
+ assertSame(allocator, buffer.alloc());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
index 4acdb36..b51dc1e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.netty;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil.NettyServerAndClient;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
@@ -37,6 +38,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -189,7 +191,9 @@ public class CancelPartitionRequestTest {
@Nullable
@Override
public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException {
- return new BufferAndBacklog(bufferProvider.requestBufferBlocking(), 0);
+ Buffer buffer = bufferProvider.requestBufferBlocking();
+ buffer.setSize(buffer.getMaxCapacity()); // fake some data
+ return new BufferAndBacklog(buffer, 0);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
index daff1c9..ed5fff8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageSerializationTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.event.task.IntegerTaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -54,12 +55,13 @@ public class NettyMessageSerializationTest {
@Test
public void testEncodeDecode() {
{
- Buffer buffer = spy(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE));
- ByteBuffer nioBuffer = buffer.getNioBuffer();
+ Buffer buffer = spy(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE));
+ ByteBuffer nioBuffer = buffer.getNioBuffer(0, buffer.getMaxCapacity());
for (int i = 0; i < 1024; i += 4) {
nioBuffer.putInt(i);
}
+ buffer.setSize(1024);
NettyMessage.BufferResponse expected = new NettyMessage.BufferResponse(buffer, random.nextInt(), new InputChannelID(), random.nextInt());
NettyMessage.BufferResponse actual = encodeAndDecode(expected);
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 42a5f11..996722f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -26,9 +26,9 @@ import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyMessage.AddCredit;
import org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
-import org.apache.flink.runtime.io.network.netty.NettyMessage.AddCredit;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -51,6 +51,7 @@ import org.junit.Test;
import java.io.IOException;
+import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -64,7 +65,6 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.hamcrest.Matchers.instanceOf;
public class PartitionRequestClientHandlerTest {
@@ -93,7 +93,7 @@ public class PartitionRequestClientHandlerTest {
when(inputChannel.getBufferProvider()).thenReturn(bufferProvider);
final BufferResponse receivedBuffer = createBufferResponse(
- TestBufferFactory.createBuffer(), 0, inputChannel.getInputChannelId(), 2);
+ TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE), 0, inputChannel.getInputChannelId(), 2);
final PartitionRequestClientHandler client = new PartitionRequestClientHandler();
client.addInputChannel(inputChannel);
@@ -110,15 +110,14 @@ public class PartitionRequestClientHandlerTest {
public void testReceiveEmptyBuffer() throws Exception {
// Minimal mock of a remote input channel
final BufferProvider bufferProvider = mock(BufferProvider.class);
- when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
+ when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer(0));
final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID());
when(inputChannel.getBufferProvider()).thenReturn(bufferProvider);
// An empty buffer of size 0
- final Buffer emptyBuffer = TestBufferFactory.createBuffer();
- emptyBuffer.setSize(0);
+ final Buffer emptyBuffer = TestBufferFactory.createBuffer(0);
final int backlog = 2;
final BufferResponse receivedBuffer = createBufferResponse(
@@ -186,7 +185,7 @@ public class PartitionRequestClientHandlerTest {
0, inputChannel.getNumberOfAvailableBuffers());
final BufferResponse bufferResponse = createBufferResponse(
- TestBufferFactory.createBuffer(), 0, inputChannel.getInputChannelId(), 2);
+ TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE), 0, inputChannel.getInputChannelId(), 2);
handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse);
verify(inputChannel, times(1)).onError(any(IllegalStateException.class));
@@ -200,7 +199,7 @@ public class PartitionRequestClientHandlerTest {
public void testReceivePartitionNotFoundException() throws Exception {
// Minimal mock of a remote input channel
final BufferProvider bufferProvider = mock(BufferProvider.class);
- when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer());
+ when(bufferProvider.requestBuffer()).thenReturn(TestBufferFactory.createBuffer(0));
final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
when(inputChannel.getInputChannelId()).thenReturn(new InputChannelID());
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index 8b1e8d2..56ed622 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -19,11 +19,9 @@
package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
@@ -107,7 +105,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
verify(listener, times(1)).notifyBuffersAvailable(eq(0L));
// Add data to the queue...
- subpartition.add(createBuffer());
+ subpartition.add(createBuffer(BUFFER_SIZE));
assertEquals(1, subpartition.getTotalNumberOfBuffers());
assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
@@ -127,7 +125,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
assertEquals(0, subpartition.getBuffersInBacklog());
// Add data to the queue...
- subpartition.add(createBuffer());
+ subpartition.add(createBuffer(BUFFER_SIZE));
assertEquals(2, subpartition.getTotalNumberOfBuffers());
assertEquals(1, subpartition.getBuffersInBacklog());
@@ -135,7 +133,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
// Add event to the queue...
- Buffer event = createBuffer();
+ Buffer event = createBuffer(BUFFER_SIZE);
event.tagAsEvent();
subpartition.add(event);
@@ -290,10 +288,8 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
private void testCleanupReleasedPartition(boolean createView) throws Exception {
PipelinedSubpartition partition = createSubpartition();
- Buffer buffer1 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
- FreeingBufferRecycler.INSTANCE);
- Buffer buffer2 = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096),
- FreeingBufferRecycler.INSTANCE);
+ Buffer buffer1 = createBuffer(4096);
+ Buffer buffer2 = createBuffer(4096);
boolean buffer1Recycled;
boolean buffer2Recycled;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 4512625..5f5f459 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -64,7 +64,7 @@ public class ResultPartitionTest {
// Pipelined, send message => notify
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, true);
- partition.writeBuffer(TestBufferFactory.createBuffer(), 0);
+ partition.writeBuffer(TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE), 0);
verify(notifier, times(1))
.notifyPartitionConsumable(
eq(partition.getJobId()),
@@ -76,7 +76,7 @@ public class ResultPartitionTest {
// Pipelined, don't send message => don't notify
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
ResultPartition partition = createPartition(notifier, ResultPartitionType.PIPELINED, false);
- partition.writeBuffer(TestBufferFactory.createBuffer(), 0);
+ partition.writeBuffer(TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE), 0);
verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
}
@@ -84,7 +84,7 @@ public class ResultPartitionTest {
// Blocking, send message => don't notify
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, true);
- partition.writeBuffer(TestBufferFactory.createBuffer(), 0);
+ partition.writeBuffer(TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE), 0);
verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
}
@@ -92,7 +92,7 @@ public class ResultPartitionTest {
// Blocking, don't send message => don't notify
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
ResultPartition partition = createPartition(notifier, ResultPartitionType.BLOCKING, false);
- partition.writeBuffer(TestBufferFactory.createBuffer(), 0);
+ partition.writeBuffer(TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE), 0);
verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
}
}
@@ -114,7 +114,7 @@ public class ResultPartitionTest {
*/
protected void testAddOnFinishedPartition(final ResultPartitionType pipelined)
throws Exception {
- Buffer buffer = TestBufferFactory.createBuffer();
+ Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
try {
ResultPartition partition = createPartition(notifier, pipelined, true);
@@ -152,7 +152,7 @@ public class ResultPartitionTest {
*/
protected void testAddOnReleasedPartition(final ResultPartitionType pipelined)
throws Exception {
- Buffer buffer = TestBufferFactory.createBuffer();
+ Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
try {
ResultPartition partition = createPartition(notifier, pipelined, true);
@@ -217,7 +217,7 @@ public class ResultPartitionTest {
throws Exception {
ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
ResultPartition partition = createPartition(notifier, pipelined, true);
- Buffer buffer = TestBufferFactory.createBuffer();
+ Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
try {
// partition.add() adds the buffer without recycling it (if not spilling)
partition.writeBuffer(buffer, 0);
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 3b00b2e..9664945 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -193,7 +193,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
public void testConsumeSpilledPartition() throws Exception {
SpillableSubpartition partition = createSubpartition();
- Buffer buffer = TestBufferFactory.createBuffer(4096);
+ Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
buffer.retain();
buffer.retain();
@@ -291,7 +291,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception {
SpillableSubpartition partition = createSubpartition();
- Buffer buffer = TestBufferFactory.createBuffer(4096);
+ Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
buffer.retain();
buffer.retain();
@@ -403,7 +403,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertEquals(1, partition.getTotalNumberOfBuffers());
assertEquals(4, partition.getTotalNumberOfBytes());
- Buffer buffer = TestBufferFactory.createBuffer(4096);
+ Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
try {
partition.add(buffer);
} finally {
@@ -447,7 +447,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertEquals(0, partition.releaseMemory());
}
- Buffer buffer = TestBufferFactory.createBuffer(4096);
+ Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
boolean bufferRecycled;
try {
partition.add(buffer);
@@ -475,7 +475,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
SpillableSubpartition partition = createSubpartition(ioManager);
assertEquals(0, partition.releaseMemory());
- Buffer buffer = TestBufferFactory.createBuffer(4096);
+ Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
boolean bufferRecycled;
try {
partition.add(buffer);
@@ -521,8 +521,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
IOManager ioManager = new IOManagerAsyncWithNoOpBufferFileWriter();
SpillableSubpartition partition = createSubpartition(ioManager);
- Buffer buffer1 = TestBufferFactory.createBuffer(4096);
- Buffer buffer2 = TestBufferFactory.createBuffer(4096);
+ Buffer buffer1 = TestBufferFactory.createBuffer(4096, 4096);
+ Buffer buffer2 = TestBufferFactory.createBuffer(4096, 4096);
try {
// we need two buffers because the view will use one of them and not release it
partition.add(buffer1);
@@ -569,7 +569,7 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
exception.expect(IOException.class);
- Buffer buffer = TestBufferFactory.createBuffer(4096);
+ Buffer buffer = TestBufferFactory.createBuffer(4096, 4096);
boolean bufferRecycled;
try {
partition.add(buffer);
@@ -635,8 +635,8 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
private void testCleanupReleasedPartition(boolean spilled, boolean createView) throws Exception {
SpillableSubpartition partition = createSubpartition();
- Buffer buffer1 = TestBufferFactory.createBuffer(4096);
- Buffer buffer2 = TestBufferFactory.createBuffer(4096);
+ Buffer buffer1 = TestBufferFactory.createBuffer(4096, 4096);
+ Buffer buffer2 = TestBufferFactory.createBuffer(4096, 4096);
boolean buffer1Recycled;
boolean buffer2Recycled;
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
index 89a4e03..08444f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewTest.java
@@ -213,7 +213,7 @@ public class SpilledSubpartitionViewTest {
final BufferFileWriter writer = IO_MANAGER.createBufferFileWriter(IO_MANAGER.createChannel());
for (int i = 0; i < numberOfBuffers; i++) {
- writer.writeBlock(TestBufferFactory.createBuffer());
+ writer.writeBlock(TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE));
}
writer.writeBlock(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE));
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 2d56258..5e12835 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -18,14 +18,13 @@
package org.apache.flink.runtime.io.network.partition;
-import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import static org.apache.flink.runtime.io.network.util.TestBufferFactory.createBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -57,7 +56,7 @@ public abstract class SubpartitionTestBase extends TestLogger {
assertEquals(0, subpartition.getBuffersInBacklog());
assertEquals(4, subpartition.getTotalNumberOfBytes());
- Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
+ Buffer buffer = createBuffer(4096, 4096);
assertFalse(subpartition.add(buffer));
assertEquals(1, subpartition.getTotalNumberOfBuffers());
@@ -83,7 +82,7 @@ public abstract class SubpartitionTestBase extends TestLogger {
assertEquals(0, subpartition.getBuffersInBacklog());
assertEquals(0, subpartition.getTotalNumberOfBytes());
- Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
+ Buffer buffer = createBuffer(4096, 4096);
assertFalse(subpartition.add(buffer));
assertEquals(0, subpartition.getTotalNumberOfBuffers());
@@ -112,7 +111,7 @@ public abstract class SubpartitionTestBase extends TestLogger {
private void verifyViewReleasedAfterParentRelease(ResultSubpartition partition) throws Exception {
// Add a buffer
- Buffer buffer = TestBufferFactory.createBuffer();
+ Buffer buffer = createBuffer(TestBufferFactory.BUFFER_SIZE);
partition.add(buffer);
partition.finish();
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index eab1d89..469aa97 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -69,7 +69,7 @@ public class RemoteInputChannelTest {
// Setup
final SingleInputGate inputGate = mock(SingleInputGate.class);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
- final Buffer buffer = TestBufferFactory.createBuffer();
+ final Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
// The test
inputChannel.onBuffer(buffer.retain(), 0, -1);
@@ -103,7 +103,7 @@ public class RemoteInputChannelTest {
// Setup
final ExecutorService executor = Executors.newFixedThreadPool(2);
- final Buffer buffer = TestBufferFactory.createBuffer();
+ final Buffer buffer = TestBufferFactory.createBuffer(TestBufferFactory.BUFFER_SIZE);
try {
// Test
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index f45d98e..364ac75 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -31,9 +31,9 @@ import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -126,7 +126,7 @@ public class SingleInputGateTest {
final ResultSubpartitionView iterator = mock(ResultSubpartitionView.class);
when(iterator.getNextBuffer()).thenReturn(
- new BufferAndBacklog(new Buffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), 0));
+ new BufferAndBacklog(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(1024), FreeingBufferRecycler.INSTANCE), 0));
final ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
when(partitionManager.createSubpartitionView(
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
index e0dc429..406b81f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestBufferFactory.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import javax.annotation.concurrent.ThreadSafe;
@@ -56,7 +57,7 @@ public class TestBufferFactory {
}
numberOfCreatedBuffers++;
- return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), bufferRecycler);
+ return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), bufferRecycler);
}
public synchronized int getNumberOfCreatedBuffers() {
@@ -71,13 +72,31 @@ public class TestBufferFactory {
// Static test helpers
// ------------------------------------------------------------------------
- public static Buffer createBuffer() {
- return createBuffer(BUFFER_SIZE);
+ /**
+ * Creates a (network) buffer with default size, i.e. {@link #BUFFER_SIZE}, and unspecified data
+ * of the given size.
+ *
+ * @param dataSize
+ * size of the data in the buffer, i.e. the new writer index
+ *
+ * @return a new buffer instance
+ */
+ public static Buffer createBuffer(int dataSize) {
+ return createBuffer(BUFFER_SIZE, dataSize);
}
- public static Buffer createBuffer(int bufferSize) {
- checkArgument(bufferSize > 0);
-
- return new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), RECYCLER);
+ /**
+ * Creates a (network) buffer with unspecified data of the given size.
+ *
+ * @param bufferSize
+ * size of the buffer
+ * @param dataSize
+ * size of the data in the buffer, i.e. the new writer index
+ *
+ * @return a new buffer instance
+ */
+ public static Buffer createBuffer(int bufferSize, int dataSize) {
+ return new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
+ RECYCLER, true, dataSize);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index eb80578..eba5912 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.shaded.guava18.com.google.common.collect.Queues;
@@ -124,7 +125,7 @@ public class TestPooledBufferProvider implements BufferProvider {
@Override
public void recycle(MemorySegment segment) {
synchronized (listenerRegistrationLock) {
- final Buffer buffer = new Buffer(segment, this);
+ final Buffer buffer = new NetworkBuffer(segment, this);
BufferListener listener = registeredListeners.poll();
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 98037cb..7f6e875 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.util.StringUtils;
@@ -131,7 +132,7 @@ public class BufferSpiller {
ByteBuffer contents;
if (boe.isBuffer()) {
Buffer buf = boe.getBuffer();
- contents = buf.getMemorySegment().wrap(0, buf.getSize());
+ contents = buf.getNioBufferReadable();
}
else {
contents = EventSerializer.toSerializedEvent(boe.getEvent());
@@ -380,7 +381,7 @@ public class BufferSpiller {
}
}
- Buffer buf = new Buffer(seg, FreeingBufferRecycler.INSTANCE);
+ Buffer buf = new NetworkBuffer(seg, FreeingBufferRecycler.INSTANCE);
buf.setSize(length);
return new BufferOrEvent(buf, channel);
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
index 3899fe1..cca6fb3 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferAlignmentLimitTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -277,7 +278,7 @@ public class BarrierBufferAlignmentLimitTest {
MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
memory.put(0, bytes);
- Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+ Buffer buf = new NetworkBuffer(memory, FreeingBufferRecycler.INSTANCE);
buf.setSize(size);
// retain an additional time so it does not get disposed after being read by the input gate
@@ -296,6 +297,7 @@ public class BarrierBufferAlignmentLimitTest {
assertEquals(expected.isBuffer(), present.isBuffer());
if (expected.isBuffer()) {
+ assertEquals(expected.getBuffer().getMaxCapacity(), present.getBuffer().getMaxCapacity());
assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
MemorySegment expectedMem = expected.getBuffer().getMemorySegment();
MemorySegment presentMem = present.getBuffer().getMemorySegment();
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index 57ecf3a..c7c9df2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -1406,7 +1407,7 @@ public class BarrierBufferTest {
MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
memory.put(0, bytes);
- Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+ Buffer buf = new NetworkBuffer(memory, FreeingBufferRecycler.INSTANCE);
buf.setSize(size);
// retain an additional time so it does not get disposed after being read by the input gate
@@ -1425,6 +1426,7 @@ public class BarrierBufferTest {
assertEquals(expected.isBuffer(), present.isBuffer());
if (expected.isBuffer()) {
+ assertEquals(expected.getBuffer().getMaxCapacity(), present.getBuffer().getMaxCapacity());
assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
MemorySegment expectedMem = expected.getBuffer().getMemorySegment();
MemorySegment presentMem = present.getBuffer().getMemorySegment();
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index 535fdb9..bcf821d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -480,7 +480,7 @@ public class BarrierTrackerTest {
private static BufferOrEvent createBuffer(int channel) {
return new BufferOrEvent(
- new Buffer(MemorySegmentFactory.wrap(new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), channel);
+ new NetworkBuffer(MemorySegmentFactory.wrap(new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), channel);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index 4edb665..ee58052 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.junit.After;
@@ -374,7 +375,7 @@ public class BufferSpillerTest {
seg.put(i, (byte) i);
}
- Buffer buf = new Buffer(seg, FreeingBufferRecycler.INSTANCE);
+ Buffer buf = new NetworkBuffer(seg, FreeingBufferRecycler.INSTANCE);
buf.setSize(size);
return new BufferOrEvent(buf, channelIndex);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 72f11ca..79e8b30 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1034,6 +1034,8 @@ under the License.
<exclude>flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/generated/*.java</exclude>
<exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_csv</exclude>
<exclude>flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/data_text</exclude>
+ <!-- netty test file, still Apache License 2.0 but with a different header -->
+ <exclude>flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/AbstractByteBufTest.java</exclude>
<!-- Configuration Files. -->
<exclude>**/flink-bin/conf/slaves</exclude>
<exclude>**/flink-bin/conf/masters</exclude>
http://git-wip-us.apache.org/repos/asf/flink/blob/85bea23a/tools/maven/suppressions-runtime.xml
----------------------------------------------------------------------
diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml
index 16f444c..9d9cb87 100644
--- a/tools/maven/suppressions-runtime.xml
+++ b/tools/maven/suppressions-runtime.xml
@@ -93,6 +93,10 @@ under the License.
<suppress
files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\](api|buffer|netty|partition|serialization|util)[/\\](.*)"
checks="AvoidStarImport|UnusedImports"/>
+ <!--Test class copied from the netty project-->
+ <suppress
+ files="(.*)test[/\\](.*)runtime[/\\]io[/\\]network[/\\]buffer[/\\]AbstractByteBufTest.java"
+ checks="[a-zA-Z0-9]*"/>
<suppress
files="(.*)runtime[/\\]jobgraph[/\\](.*)"
checks="NewlineAtEndOfFile|RegexpSingleline|TodoComment|RedundantImport|ImportOrder|RedundantModifier|JavadocMethod|JavadocParagraph|JavadocType|JavadocStyle|PackageName|TypeNameCheck|ConstantNameCheck|StaticVariableNameCheck|MemberNameCheck|MethodNameCheck|ParameterName|LocalFinalVariableName|LocalVariableName|LeftCurly|UpperEll|FallThrough|reliefPattern|SimplifyBooleanExpression|EmptyStatement|ModifierOrder|EmptyLineSeparator|WhitespaceAround|WhitespaceAfter|NoWhitespaceAfter|NoWhitespaceBefore|OperatorWrap|ParenPad"/>