You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/15 02:05:16 UTC
[flink] 02/02: [FLINK-17322][network] Disallowing repeated consumer
creation for BufferBuilder.
This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit dfdfdadf445ea055c841c526b1a382424e1e1865
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Fri Jun 12 10:03:59 2020 +0200
[FLINK-17322][network] Disallowing repeated consumer creation for BufferBuilder.
This is a partial revert of FLINK-10995.
---
.../runtime/io/network/api/writer/RecordWriter.java | 5 +++++
.../flink/runtime/io/network/buffer/BufferBuilder.java | 4 ++++
.../io/network/api/writer/RecordWriterDelegateTest.java | 10 ++++++++--
.../runtime/io/network/api/writer/RecordWriterTest.java | 5 +++--
.../io/network/buffer/BufferBuilderAndConsumerTest.java | 16 +++-------------
5 files changed, 23 insertions(+), 17 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 7d0f7de..be40d8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -349,4 +349,9 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
}
}
}
+
+ @VisibleForTesting
+ ResultPartitionWriter getTargetPartition() {
+ return targetPartition;
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 7780ba8..2cb873c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -41,6 +41,8 @@ public class BufferBuilder {
private final SettablePositionMarker positionMarker = new SettablePositionMarker();
+ private boolean bufferConsumerCreated = false;
+
public BufferBuilder(MemorySegment memorySegment, BufferRecycler recycler) {
this.memorySegment = checkNotNull(memorySegment);
this.recycler = checkNotNull(recycler);
@@ -53,6 +55,8 @@ public class BufferBuilder {
* @return created matching instance of {@link BufferConsumer} to this {@link BufferBuilder}.
*/
public BufferConsumer createBufferConsumer() {
+ checkState(!bufferConsumerCreated, "Two BufferConsumer shouldn't exist for one BufferBuilder");
+ bufferConsumerCreated = true;
return new BufferConsumer(
memorySegment,
recycler,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
index 97d8053..4a7e5c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
@@ -25,7 +25,9 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.util.TestLogger;
@@ -148,13 +150,17 @@ public class RecordWriterDelegateTest extends TestLogger {
assertTrue(writerDelegate.getAvailableFuture().isDone());
// request one buffer from the local pool to make it unavailable
- final BufferBuilder bufferBuilder = checkNotNull(writerDelegate.getRecordWriter(0).getBufferBuilder(0));
+ RecordWriter recordWriter = writerDelegate.getRecordWriter(0);
+ final BufferBuilder bufferBuilder = checkNotNull(recordWriter.getBufferBuilder(0));
assertFalse(writerDelegate.isAvailable());
CompletableFuture future = writerDelegate.getAvailableFuture();
assertFalse(future.isDone());
// recycle the buffer to make the local pool available again
- final Buffer buffer = BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder);
+ BufferBuilderTestUtils.fillBufferBuilder(bufferBuilder, 1).finish();
+ ResultSubpartitionView readView = recordWriter.getTargetPartition().getSubpartition(0).createReadView(new NoOpBufferAvailablityListener());
+ Buffer buffer = readView.getNextBuffer().buffer();
+
buffer.recycleBuffer();
assertTrue(future.isDone());
assertTrue(writerDelegate.isAvailable());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 84ec497..6e75dff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -519,8 +519,9 @@ public class RecordWriterTest {
new NoOpResultPartitionConsumableNotifier());
final RecordWriter recordWriter = createRecordWriter(partitionWrapper);
BufferBuilder builder = recordWriter.requestNewBufferBuilder(0);
- final Buffer buffer = BufferBuilderTestUtils.buildSingleBuffer(builder);
- builder.finish();
+ BufferBuilderTestUtils.fillBufferBuilder(builder, 1).finish();
+ ResultSubpartitionView readView = resultPartition.getSubpartition(0).createReadView(new NoOpBufferAvailablityListener());
+ Buffer buffer = readView.getNextBuffer().buffer();
// idle time is zero when there is buffer available.
assertEquals(0, recordWriter.getIdleTimeMsPerSecond().getCount());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
index 3900bb0..124e845 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
@@ -119,21 +119,11 @@ public class BufferBuilderAndConsumerTest {
assertContent(bufferConsumer, 42);
}
- @Test
+ @Test(expected = IllegalStateException.class)
public void creatingBufferConsumerTwice() {
BufferBuilder bufferBuilder = createBufferBuilder();
- BufferConsumer bufferConsumer1 = bufferBuilder.createBufferConsumer();
-
- assertEquals(0, bufferConsumer1.getCurrentReaderPosition());
- assertContent(bufferConsumer1);
-
- ByteBuffer bytesToWrite = toByteBuffer(0, 1);
- bufferBuilder.appendAndCommit(bytesToWrite);
- BufferConsumer bufferConsumer2 = bufferBuilder.createBufferConsumer();
- bufferBuilder.appendAndCommit(toByteBuffer(2));
-
- assertEquals(bytesToWrite.position(), bufferConsumer2.getCurrentReaderPosition());
- assertContent(bufferConsumer2, 2);
+ bufferBuilder.createBufferConsumer();
+ bufferBuilder.createBufferConsumer();
}
@Test