You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/01/08 13:04:04 UTC
[10/15] flink git commit: [FLINK-8178][network] Introduce not
threadsafe write only BufferBuilder
[FLINK-8178][network] Introduce not threadsafe write only BufferBuilder
While Buffer class is used in multithreaded context it requires synchronisation.
Previously it was miss-leading and unclear, suggesting that RecordSerializer should
take into account synchronisation of the Buffer that's holding. With NotThreadSafe
BufferBuilder there is now clear separation between single-threaded writing/creating
a BufferBuilder and multithreaded Buffer handling/retaining/recycling.
This increases throughput of network stack by factor of 2, because previously
method getMemorySegment() was called twice per record and it is a synchronized
method on recycleLock, while RecordSerializer is sole owner of the Buffer at
this point, so synchronisation is not needed.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6945c2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6945c2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6945c2e
Branch: refs/heads/master
Commit: c6945c2ef48d4c2cad3fc935435c1ab83e834969
Parents: 0888bb6
Author: Piotr Nowojski <pi...@gmail.com>
Authored: Tue Nov 28 16:49:37 2017 +0100
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Mon Jan 8 11:46:00 2018 +0100
----------------------------------------------------------------------
.../api/serialization/RecordSerializer.java | 17 +--
.../serialization/SpanningRecordSerializer.java | 57 ++++-----
.../io/network/api/writer/RecordWriter.java | 10 +-
.../flink/runtime/io/network/buffer/Buffer.java | 7 +-
.../io/network/buffer/BufferBuilder.java | 82 +++++++++++++
.../io/network/buffer/BufferProvider.java | 8 ++
.../io/network/buffer/LocalBufferPool.java | 15 ++-
.../SpanningRecordSerializationTest.java | 11 +-
.../SpanningRecordSerializerTest.java | 26 ++---
.../io/network/api/writer/RecordWriterTest.java | 37 ++++--
.../io/network/buffer/BufferBuilderTest.java | 115 +++++++++++++++++++
.../network/buffer/BufferBuilderTestUtils.java | 32 ++++++
.../IteratorWrappingTestSingleInputGate.java | 9 +-
.../network/serialization/LargeRecordsTest.java | 23 ++--
.../network/util/TestPooledBufferProvider.java | 7 ++
.../consumer/StreamTestSingleInputGate.java | 10 +-
16 files changed, 352 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index 5fe56c4..6a07f31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -19,11 +19,12 @@
package org.apache.flink.runtime.io.network.api.serialization;
-import java.io.IOException;
-
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+
+import java.io.IOException;
/**
* Interface for turning records into sequences of memory segments.
@@ -79,19 +80,19 @@ public interface RecordSerializer<T extends IOReadableWritable> {
* Sets a (next) target buffer to use and continues writing remaining data
* to it until it is full.
*
- * @param buffer the new target buffer to use
+ * @param bufferBuilder the new target buffer to use
* @return how much information was written to the target buffer and
* whether this buffer is full
* @throws IOException
*/
- SerializationResult setNextBuffer(Buffer buffer) throws IOException;
+ SerializationResult setNextBufferBuilder(BufferBuilder bufferBuilder) throws IOException;
/**
* Retrieves the current target buffer and sets its size to the actual
* number of written bytes.
*
* After calling this method, a new target buffer is required to continue
- * writing (see {@link #setNextBuffer(Buffer)}).
+ * writing (see {@link #setNextBufferBuilder(BufferBuilder)}).
*
* @return the target buffer that was used
*/
@@ -102,7 +103,7 @@ public interface RecordSerializer<T extends IOReadableWritable> {
*
* <p><strong>NOTE:</strong> After calling this method, <strong>a new target
* buffer is required to continue writing</strong> (see
- * {@link #setNextBuffer(Buffer)}).</p>
+ * {@link #setNextBufferBuilder(BufferBuilder)}).</p>
*/
void clearCurrentBuffer();
@@ -112,7 +113,7 @@ public interface RecordSerializer<T extends IOReadableWritable> {
*
* <p><strong>NOTE:</strong> After calling this method, a <strong>new record
* and a new target buffer is required to start writing again</strong>
- * (see {@link #setNextBuffer(Buffer)}). If you want to continue
+ * (see {@link #setNextBufferBuilder(BufferBuilder)}). If you want to continue
* with the current record, use {@link #clearCurrentBuffer()} instead.</p>
*/
void clear();
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 7394f83..efdfaa1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -18,20 +18,23 @@
package org.apache.flink.runtime.io.network.api.serialization;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-
import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
/**
* Record serializer which serializes the complete record to an intermediate
* data serialization buffer and copies this buffer to target buffers
- * one-by-one using {@link #setNextBuffer(Buffer)}.
+ * one-by-one using {@link #setNextBufferBuilder(BufferBuilder)}.
*
* @param <T>
*/
@@ -50,13 +53,8 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
private final ByteBuffer lengthBuffer;
/** Current target {@link Buffer} of the serializer */
- private Buffer targetBuffer;
-
- /** Position in current {@link MemorySegment} of target buffer */
- private int position;
-
- /** Limit of current {@link MemorySegment} of target buffer */
- private int limit;
+ @Nullable
+ private BufferBuilder targetBuffer;
public SpanningRecordSerializer() {
serializationBuffer = new DataOutputSerializer(128);
@@ -64,7 +62,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
lengthBuffer = ByteBuffer.allocate(4);
lengthBuffer.order(ByteOrder.BIG_ENDIAN);
- // ensure initial state with hasRemaining false (for correct setNextBuffer logic)
+ // ensure initial state with hasRemaining false (for correct setNextBufferBuilder logic)
dataBuffer = serializationBuffer.wrapAsByteBuffer();
lengthBuffer.position(4);
}
@@ -105,10 +103,8 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
}
@Override
- public SerializationResult setNextBuffer(Buffer buffer) throws IOException {
+ public SerializationResult setNextBufferBuilder(BufferBuilder buffer) throws IOException {
targetBuffer = buffer;
- position = 0;
- limit = buffer.getSize();
if (lengthBuffer.hasRemaining()) {
copyToTargetBufferFrom(lengthBuffer);
@@ -140,19 +136,12 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
if (targetBuffer == null) {
return;
}
-
- int needed = source.remaining();
- int available = limit - position;
- int toCopy = Math.min(needed, available);
-
- targetBuffer.getMemorySegment().put(position, source, toCopy);
-
- position += toCopy;
+ targetBuffer.append(source);
}
private SerializationResult getSerializationResult() {
if (!dataBuffer.hasRemaining() && !lengthBuffer.hasRemaining()) {
- return (position < limit)
+ return !targetBuffer.isFull()
? SerializationResult.FULL_RECORD
: SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL;
}
@@ -165,25 +154,21 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
if (targetBuffer == null) {
return null;
}
-
- targetBuffer.setSize(position);
- return targetBuffer;
+ Buffer result = targetBuffer.build();
+ targetBuffer = null;
+ return result;
}
@Override
public void clearCurrentBuffer() {
targetBuffer = null;
- position = 0;
- limit = 0;
}
@Override
public void clear() {
targetBuffer = null;
- position = 0;
- limit = 0;
- // ensure clear state with hasRemaining false (for correct setNextBuffer logic)
+ // ensure clear state with hasRemaining false (for correct setNextBufferBuilder logic)
dataBuffer.position(dataBuffer.limit());
lengthBuffer.position(4);
}
@@ -191,7 +176,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
@Override
public boolean hasData() {
// either data in current target buffer or intermediate buffers
- return position > 0 || (lengthBuffer.hasRemaining() || dataBuffer.hasRemaining());
+ return (targetBuffer != null && !targetBuffer.isEmpty()) || lengthBuffer.hasRemaining() || dataBuffer.hasRemaining();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 4729800..39dbacc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -21,12 +21,13 @@ package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.XORShiftRandom;
import java.io.IOException;
@@ -129,8 +130,9 @@ public class RecordWriter<T extends IOReadableWritable> {
break;
}
} else {
- buffer = targetPartition.getBufferProvider().requestBufferBlocking();
- result = serializer.setNextBuffer(buffer);
+ BufferBuilder bufferBuilder =
+ targetPartition.getBufferProvider().requestBufferBuilderBlocking();
+ result = serializer.setNextBufferBuilder(bufferBuilder);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index d7980d2..33516bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -54,11 +54,14 @@ public class Buffer {
}
public Buffer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer) {
+ this(memorySegment, recycler, isBuffer, memorySegment.size());
+ }
+
+ public Buffer(MemorySegment memorySegment, BufferRecycler recycler, boolean isBuffer, int size) {
this.memorySegment = checkNotNull(memorySegment);
this.recycler = checkNotNull(recycler);
this.isBuffer = isBuffer;
-
- this.currentSize = memorySegment.size();
+ this.currentSize = size;
}
public boolean isBuffer() {
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
new file mode 100644
index 0000000..08e49b5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegment;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Not thread safe class for filling in the initial content of the {@link Buffer}. Once writing to the builder
+ * is complete, {@link Buffer} instance can be built and shared across multiple threads.
+ */
+@NotThreadSafe
+public class BufferBuilder {
+ private final MemorySegment memorySegment;
+
+ private final BufferRecycler recycler;
+
+ private int position = 0;
+
+ private boolean built = false;
+
+ public BufferBuilder(MemorySegment memorySegment, BufferRecycler recycler) {
+ this.memorySegment = checkNotNull(memorySegment);
+ this.recycler = checkNotNull(recycler);
+ }
+
+ /**
+ * @return number of copied bytes
+ */
+ public int append(ByteBuffer source) {
+ checkState(!built);
+
+ int needed = source.remaining();
+ int available = limit() - position;
+ int toCopy = Math.min(needed, available);
+
+ memorySegment.put(position, source, toCopy);
+ position += toCopy;
+ return toCopy;
+ }
+
+ public boolean isFull() {
+ checkState(position <= limit());
+ return position == limit();
+ }
+
+ public Buffer build() {
+ checkState(!built);
+ built = true;
+ return new Buffer(memorySegment, recycler, true, position);
+ }
+
+ public boolean isEmpty() {
+ return position == 0;
+ }
+
+ private int limit() {
+ return memorySegment.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
index 9782584..843a2f6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
@@ -44,6 +44,14 @@ public interface BufferProvider {
Buffer requestBufferBlocking() throws IOException, InterruptedException;
/**
+ * Returns a {@link BufferBuilder} instance from the buffer provider.
+ *
+ * <p>If there is no buffer available, the call will block until one becomes available again or the
+ * buffer provider has been destroyed.
+ */
+ BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException;
+
+ /**
* Adds a buffer availability listener to the buffer provider.
*
* <p>The operation fails with return value <code>false</code>, when there is a buffer available or
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index a66373c..7403bd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -179,7 +179,8 @@ class LocalBufferPool implements BufferPool {
@Override
public Buffer requestBuffer() throws IOException {
try {
- return requestBuffer(false);
+ BufferBuilder bufferBuilder = requestBufferBuilder(false);
+ return bufferBuilder != null ? bufferBuilder.build() : null;
}
catch (InterruptedException e) {
throw new IOException(e);
@@ -188,10 +189,16 @@ class LocalBufferPool implements BufferPool {
@Override
public Buffer requestBufferBlocking() throws IOException, InterruptedException {
- return requestBuffer(true);
+ BufferBuilder bufferBuilder = requestBufferBuilder(true);
+ return bufferBuilder != null ? bufferBuilder.build() : null;
}
- private Buffer requestBuffer(boolean isBlocking) throws InterruptedException, IOException {
+ @Override
+ public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
+ return requestBufferBuilder(true);
+ }
+
+ private BufferBuilder requestBufferBuilder(boolean isBlocking) throws InterruptedException, IOException {
synchronized (availableMemorySegments) {
returnExcessMemorySegments();
@@ -226,7 +233,7 @@ class LocalBufferPool implements BufferPool {
}
}
- return new Buffer(availableMemorySegments.poll(), this);
+ return new BufferBuilder(availableMemorySegments.poll(), this);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index f988c55..ed0ce6c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -19,19 +19,16 @@
package org.apache.flink.runtime.io.network.api.serialization;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayDeque;
-import static org.mockito.Mockito.mock;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
public class SpanningRecordSerializationTest {
@@ -129,13 +126,11 @@ public class SpanningRecordSerializationTest {
{
final int SERIALIZATION_OVERHEAD = 4; // length encoding
- final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), mock(BufferRecycler.class));
-
final ArrayDeque<SerializationTestType> serializedRecords = new ArrayDeque<SerializationTestType>();
// -------------------------------------------------------------------------------------------------------------
- serializer.setNextBuffer(buffer);
+ serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
int numBytes = 0;
int numRecords = 0;
@@ -164,7 +159,7 @@ public class SpanningRecordSerializationTest {
}
}
- while (serializer.setNextBuffer(buffer).isFullBuffer()) {
+ while (serializer.setNextBufferBuilder(createBufferBuilder(segmentSize)).isFullBuffer()) {
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), segmentSize);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
index fe9a386..ed6677e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializerTest.java
@@ -21,19 +21,17 @@ package org.apache.flink.runtime.io.network.api.serialization;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Random;
-import static org.mockito.Mockito.mock;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
public class SpanningRecordSerializerTest {
@@ -42,7 +40,6 @@ public class SpanningRecordSerializerTest {
final int SEGMENT_SIZE = 16;
final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
- final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
final SerializationTestType randomIntRecord = Util.randomRecord(SerializationTestTypeFactory.INT);
Assert.assertFalse(serializer.hasData());
@@ -51,13 +48,13 @@ public class SpanningRecordSerializerTest {
serializer.addRecord(randomIntRecord);
Assert.assertTrue(serializer.hasData());
- serializer.setNextBuffer(buffer);
+ serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
Assert.assertTrue(serializer.hasData());
serializer.clear();
Assert.assertFalse(serializer.hasData());
- serializer.setNextBuffer(buffer);
+ serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
serializer.addRecord(randomIntRecord);
Assert.assertTrue(serializer.hasData());
@@ -76,10 +73,11 @@ public class SpanningRecordSerializerTest {
final int SEGMENT_SIZE = 11;
final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
- final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
try {
- Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, serializer.setNextBuffer(buffer));
+ Assert.assertEquals(
+ RecordSerializer.SerializationResult.FULL_RECORD,
+ serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)));
} catch (IOException e) {
e.printStackTrace();
}
@@ -122,7 +120,7 @@ public class SpanningRecordSerializerTest {
result = serializer.addRecord(emptyRecord);
Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
- result = serializer.setNextBuffer(buffer);
+ result = serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
}
catch (Exception e) {
@@ -202,11 +200,10 @@ public class SpanningRecordSerializerTest {
final int SERIALIZATION_OVERHEAD = 4; // length encoding
final SpanningRecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
- final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(segmentSize), mock(BufferRecycler.class));
// -------------------------------------------------------------------------------------------------------------
- serializer.setNextBuffer(buffer);
+ serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
int numBytes = 0;
for (SerializationTestType record : records) {
@@ -217,14 +214,15 @@ public class SpanningRecordSerializerTest {
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD, result);
} else if (numBytes == segmentSize) {
Assert.assertEquals(RecordSerializer.SerializationResult.FULL_RECORD_MEMORY_SEGMENT_FULL, result);
- serializer.setNextBuffer(buffer);
+ serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
numBytes = 0;
} else {
Assert.assertEquals(RecordSerializer.SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL, result);
while (result.isFullBuffer()) {
numBytes -= segmentSize;
- result = serializer.setNextBuffer(buffer);
+
+ result = serializer.setNextBufferBuilder(createBufferBuilder(segmentSize));
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 59b98a2..f0eaa94 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -31,11 +31,12 @@ import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer.SerializationResult;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
@@ -54,6 +55,8 @@ import org.powermock.modules.junit4.PowerMockRunner;
import java.io.IOException;
import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Callable;
@@ -70,7 +73,6 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -97,16 +99,18 @@ public class RecordWriterTest {
final CountDownLatch sync = new CountDownLatch(2);
- final Buffer buffer = spy(TestBufferFactory.createBuffer(4));
+ final TrackingBufferRecycler recycler = new TrackingBufferRecycler();
+
+ final MemorySegment memorySegment = MemorySegmentFactory.allocateUnpooledSegment(4);
// Return buffer for first request, but block for all following requests.
- Answer<Buffer> request = new Answer<Buffer>() {
+ Answer<BufferBuilder> request = new Answer<BufferBuilder>() {
@Override
- public Buffer answer(InvocationOnMock invocation) throws Throwable {
+ public BufferBuilder answer(InvocationOnMock invocation) throws Throwable {
sync.countDown();
if (sync.getCount() == 1) {
- return buffer;
+ return new BufferBuilder(memorySegment, recycler);
}
final Object o = new Object();
@@ -119,7 +123,7 @@ public class RecordWriterTest {
};
BufferProvider bufferProvider = mock(BufferProvider.class);
- when(bufferProvider.requestBufferBlocking()).thenAnswer(request);
+ when(bufferProvider.requestBufferBuilderBlocking()).thenAnswer(request);
ResultPartitionWriter partitionWriter = createResultPartitionWriter(bufferProvider);
@@ -156,13 +160,13 @@ public class RecordWriterTest {
recordWriter.clearBuffers();
// Verify that buffer have been requested, but only one has been written out.
- verify(bufferProvider, times(2)).requestBufferBlocking();
+ verify(bufferProvider, times(2)).requestBufferBuilderBlocking();
verify(partitionWriter, times(1)).writeBuffer(any(Buffer.class), anyInt());
// Verify that the written out buffer has only been recycled once
// (by the partition writer).
- assertTrue("Buffer not recycled.", buffer.isRecycled());
- verify(buffer, times(1)).recycle();
+ assertEquals(1, recycler.getRecycledMemorySegments().size());
+ assertEquals(memorySegment, recycler.getRecycledMemorySegments().get(0));
}
finally {
if (executor != null) {
@@ -566,4 +570,17 @@ public class RecordWriterTest {
return nextChannel;
}
}
+
+ private static class TrackingBufferRecycler implements BufferRecycler {
+ private final ArrayList<MemorySegment> recycledMemorySegments = new ArrayList<>();
+
+ @Override
+ public synchronized void recycle(MemorySegment memorySegment) {
+ recycledMemorySegments.add(memorySegment);
+ }
+
+ public synchronized List<MemorySegment> getRecycledMemorySegments() {
+ return recycledMemorySegments;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
new file mode 100644
index 0000000..3805274
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.testutils.DiscardingRecycler;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link BufferBuilder}.
+ */
+public class BufferBuilderTest {
+ private static final int BUFFER_SIZE = 10 * Integer.BYTES;
+
+ @Test
+ public void append() {
+ BufferBuilder bufferBuilder = createBufferBuilder();
+ int[] intsToWrite = new int[] {0, 1, 2, 3, 42};
+ ByteBuffer bytesToWrite = toByteBuffer(intsToWrite);
+
+ assertEquals(bytesToWrite.limit(), bufferBuilder.append(bytesToWrite));
+
+ assertEquals(bytesToWrite.limit(), bytesToWrite.position());
+ assertFalse(bufferBuilder.isFull());
+ Buffer buffer = bufferBuilder.build();
+ assertBufferContent(buffer, intsToWrite);
+ assertEquals(5 * Integer.BYTES, buffer.getSize());
+ assertEquals(DiscardingRecycler.INSTANCE, buffer.getRecycler());
+ }
+
+ @Test
+ public void multipleAppends() {
+ BufferBuilder bufferBuilder = createBufferBuilder();
+
+ bufferBuilder.append(toByteBuffer(0, 1));
+ bufferBuilder.append(toByteBuffer(2));
+ bufferBuilder.append(toByteBuffer(3, 42));
+
+ Buffer buffer = bufferBuilder.build();
+ assertBufferContent(buffer, 0, 1, 2, 3, 42);
+ assertEquals(5 * Integer.BYTES, buffer.getSize());
+ }
+
+ @Test
+ public void appendOverSize() {
+ BufferBuilder bufferBuilder = createBufferBuilder();
+ ByteBuffer bytesToWrite = toByteBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 42);
+
+ assertEquals(BUFFER_SIZE, bufferBuilder.append(bytesToWrite));
+
+ assertTrue(bufferBuilder.isFull());
+ Buffer buffer = bufferBuilder.build();
+ assertBufferContent(buffer, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+ assertEquals(BUFFER_SIZE, buffer.getSize());
+
+ bufferBuilder = createBufferBuilder();
+ assertEquals(Integer.BYTES, bufferBuilder.append(bytesToWrite));
+
+ assertFalse(bufferBuilder.isFull());
+ buffer = bufferBuilder.build();
+ assertBufferContent(buffer, 42);
+ assertEquals(Integer.BYTES, buffer.getSize());
+ }
+
+ @Test
+ public void buildEmptyBuffer() {
+ Buffer buffer = createBufferBuilder().build();
+ assertEquals(0, buffer.getSize());
+ assertBufferContent(buffer);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void buildingBufferTwice() {
+ BufferBuilder bufferBuilder = createBufferBuilder();
+ bufferBuilder.build();
+ bufferBuilder.build();
+ }
+
+ private static ByteBuffer toByteBuffer(int... data) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(data.length * Integer.BYTES);
+ byteBuffer.asIntBuffer().put(data);
+ return byteBuffer;
+ }
+
+ private static void assertBufferContent(Buffer actualBuffer, int... expected) {
+ assertEquals(toByteBuffer(expected), actualBuffer.getNioBuffer());
+ }
+
+ private static BufferBuilder createBufferBuilder() {
+ return new BufferBuilder(MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE), DiscardingRecycler.INSTANCE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
new file mode 100644
index 0000000..1113664
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.buffer;
+
+import org.apache.flink.core.memory.MemorySegmentFactory;
+
+/**
+ * Utility class for create not-pooled {@link BufferBuilder}.
+ */
+public class BufferBuilderTestUtils {
+ public static BufferBuilder createBufferBuilder(int size) {
+ return new BufferBuilder(
+ MemorySegmentFactory.allocateUnpooledSegment(size),
+ FreeingBufferRecycler.INSTANCE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index fa44393..16285b7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -19,22 +19,20 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;
+
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
-import static org.mockito.Mockito.mock;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
import static org.mockito.Mockito.when;
public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> extends TestSingleInputGate {
@@ -71,8 +69,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
@Override
public InputChannel.BufferAndAvailability answer(InvocationOnMock invocationOnMock) throws Throwable {
if (hasData) {
- final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(bufferSize), mock(BufferRecycler.class));
- serializer.setNextBuffer(buffer);
+ serializer.setNextBufferBuilder(createBufferBuilder(bufferSize));
serializer.addRecord(reuse);
hasData = inputIterator.next(reuse) != null;
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
index d596863..057b917 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
@@ -18,28 +18,27 @@
package org.apache.flink.runtime.io.network.serialization;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.testutils.serialization.types.IntType;
-import org.apache.flink.testutils.serialization.types.SerializationTestType;
+import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType;
+import org.apache.flink.testutils.serialization.types.IntType;
+import org.apache.flink.testutils.serialization.types.SerializationTestType;
+
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
public class LargeRecordsTest {
@@ -52,8 +51,6 @@ public class LargeRecordsTest {
final RecordSerializer<SerializationTestType> serializer = new SpanningRecordSerializer<SerializationTestType>();
final RecordDeserializer<SerializationTestType> deserializer = new AdaptiveSpanningRecordDeserializer<SerializationTestType>();
- final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
-
List<SerializationTestType> originalRecords = new ArrayList<SerializationTestType>((NUM_RECORDS + 1) / 2);
List<SerializationTestType> deserializedRecords = new ArrayList<SerializationTestType>((NUM_RECORDS + 1) / 2);
@@ -73,7 +70,7 @@ public class LargeRecordsTest {
// -------------------------------------------------------------------------------------------------------------
- serializer.setNextBuffer(buffer);
+ serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
int numRecordsDeserialized = 0;
@@ -98,7 +95,7 @@ public class LargeRecordsTest {
}
// move buffers as long as necessary (for long records)
- while (serializer.setNextBuffer(buffer).isFullBuffer()) {
+ while (serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)).isFullBuffer()) {
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), SEGMENT_SIZE);
}
@@ -152,8 +149,6 @@ public class LargeRecordsTest {
new SpillingAdaptiveSpanningRecordDeserializer<SerializationTestType>(
new String[] { System.getProperty("java.io.tmpdir") } );
- final Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(SEGMENT_SIZE), mock(BufferRecycler.class));
-
List<SerializationTestType> originalRecords = new ArrayList<>((NUM_RECORDS + 1) / 2);
List<SerializationTestType> deserializedRecords = new ArrayList<>((NUM_RECORDS + 1) / 2);
@@ -173,7 +168,7 @@ public class LargeRecordsTest {
// -------------------------------------------------------------------------------------------------------------
- serializer.setNextBuffer(buffer);
+ serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE));
int numRecordsDeserialized = 0;
@@ -198,7 +193,7 @@ public class LargeRecordsTest {
}
// move buffers as long as necessary (for long records)
- while (serializer.setNextBuffer(buffer).isFullBuffer()) {
+ while (serializer.setNextBufferBuilder(createBufferBuilder(SEGMENT_SIZE)).isFullBuffer()) {
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(), SEGMENT_SIZE);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
index cc52549..eb80578 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestPooledBufferProvider.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.io.network.util;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferListener;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
@@ -79,6 +80,12 @@ public class TestPooledBufferProvider implements BufferProvider {
}
@Override
+ public BufferBuilder requestBufferBuilderBlocking() throws IOException, InterruptedException {
+ Buffer buffer = requestBufferBlocking();
+ return new BufferBuilder(buffer.getMemorySegment(), buffer.getRecycler());
+ }
+
+ @Override
public boolean addBufferListener(BufferListener listener) {
return bufferRecycler.registerListener(listener);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c6945c2e/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index e14430e..f19d59d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -21,14 +21,11 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.plugable.SerializationDelegate;
@@ -41,8 +38,8 @@ import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
@@ -104,11 +101,8 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
return new BufferAndAvailability(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), false);
} else if (input != null && input.isStreamRecord()) {
Object inputElement = input.getStreamRecord();
- final Buffer buffer = new Buffer(
- MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
- mock(BufferRecycler.class));
- recordSerializer.setNextBuffer(buffer);
+ recordSerializer.setNextBufferBuilder(createBufferBuilder(bufferSize));
delegate.setInstance(inputElement);
recordSerializer.addRecord(delegate);