You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/06/15 02:05:16 UTC

[flink] 02/02: [FLINK-17322][network] Disallowing repeated consumer creation for BufferBuilder.

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dfdfdadf445ea055c841c526b1a382424e1e1865
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Fri Jun 12 10:03:59 2020 +0200

    [FLINK-17322][network] Disallowing repeated consumer creation for BufferBuilder.
    
    This is a partial revert of FLINK-10995.
---
 .../runtime/io/network/api/writer/RecordWriter.java      |  5 +++++
 .../flink/runtime/io/network/buffer/BufferBuilder.java   |  4 ++++
 .../io/network/api/writer/RecordWriterDelegateTest.java  | 10 ++++++++--
 .../runtime/io/network/api/writer/RecordWriterTest.java  |  5 +++--
 .../io/network/buffer/BufferBuilderAndConsumerTest.java  | 16 +++-------------
 5 files changed, 23 insertions(+), 17 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 7d0f7de..be40d8d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -349,4 +349,9 @@ public abstract class RecordWriter<T extends IOReadableWritable> implements Avai
 			}
 		}
 	}
+
+	@VisibleForTesting
+	ResultPartitionWriter getTargetPartition() {
+		return targetPartition;
+	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 7780ba8..2cb873c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -41,6 +41,8 @@ public class BufferBuilder {
 
 	private final SettablePositionMarker positionMarker = new SettablePositionMarker();
 
+	private boolean bufferConsumerCreated = false;
+
 	public BufferBuilder(MemorySegment memorySegment, BufferRecycler recycler) {
 		this.memorySegment = checkNotNull(memorySegment);
 		this.recycler = checkNotNull(recycler);
@@ -53,6 +55,8 @@ public class BufferBuilder {
 	 * @return created matching instance of {@link BufferConsumer} to this {@link BufferBuilder}.
 	 */
 	public BufferConsumer createBufferConsumer() {
+		checkState(!bufferConsumerCreated, "Two BufferConsumer shouldn't exist for one BufferBuilder");
+		bufferConsumerCreated = true;
 		return new BufferConsumer(
 			memorySegment,
 			recycler,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
index 97d8053..4a7e5c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
@@ -25,7 +25,9 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.util.TestLogger;
@@ -148,13 +150,17 @@ public class RecordWriterDelegateTest extends TestLogger {
 		assertTrue(writerDelegate.getAvailableFuture().isDone());
 
 		// request one buffer from the local pool to make it unavailable
-		final BufferBuilder bufferBuilder = checkNotNull(writerDelegate.getRecordWriter(0).getBufferBuilder(0));
+		RecordWriter recordWriter = writerDelegate.getRecordWriter(0);
+		final BufferBuilder bufferBuilder = checkNotNull(recordWriter.getBufferBuilder(0));
 		assertFalse(writerDelegate.isAvailable());
 		CompletableFuture future = writerDelegate.getAvailableFuture();
 		assertFalse(future.isDone());
 
 		// recycle the buffer to make the local pool available again
-		final Buffer buffer = BufferBuilderTestUtils.buildSingleBuffer(bufferBuilder);
+		BufferBuilderTestUtils.fillBufferBuilder(bufferBuilder, 1).finish();
+		ResultSubpartitionView readView = recordWriter.getTargetPartition().getSubpartition(0).createReadView(new NoOpBufferAvailablityListener());
+		Buffer buffer = readView.getNextBuffer().buffer();
+
 		buffer.recycleBuffer();
 		assertTrue(future.isDone());
 		assertTrue(writerDelegate.isAvailable());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 84ec497..6e75dff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -519,8 +519,9 @@ public class RecordWriterTest {
 			new NoOpResultPartitionConsumableNotifier());
 		final RecordWriter recordWriter = createRecordWriter(partitionWrapper);
 		BufferBuilder builder = recordWriter.requestNewBufferBuilder(0);
-		final Buffer buffer = BufferBuilderTestUtils.buildSingleBuffer(builder);
-		builder.finish();
+		BufferBuilderTestUtils.fillBufferBuilder(builder, 1).finish();
+		ResultSubpartitionView readView = resultPartition.getSubpartition(0).createReadView(new NoOpBufferAvailablityListener());
+		Buffer buffer = readView.getNextBuffer().buffer();
 
 		// idle time is zero when there is buffer available.
 		assertEquals(0, recordWriter.getIdleTimeMsPerSecond().getCount());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
index 3900bb0..124e845 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderAndConsumerTest.java
@@ -119,21 +119,11 @@ public class BufferBuilderAndConsumerTest {
 		assertContent(bufferConsumer, 42);
 	}
 
-	@Test
+	@Test(expected = IllegalStateException.class)
 	public void creatingBufferConsumerTwice() {
 		BufferBuilder bufferBuilder = createBufferBuilder();
-		BufferConsumer bufferConsumer1 = bufferBuilder.createBufferConsumer();
-
-		assertEquals(0, bufferConsumer1.getCurrentReaderPosition());
-		assertContent(bufferConsumer1);
-
-		ByteBuffer bytesToWrite = toByteBuffer(0, 1);
-		bufferBuilder.appendAndCommit(bytesToWrite);
-		BufferConsumer bufferConsumer2 = bufferBuilder.createBufferConsumer();
-		bufferBuilder.appendAndCommit(toByteBuffer(2));
-
-		assertEquals(bytesToWrite.position(), bufferConsumer2.getCurrentReaderPosition());
-		assertContent(bufferConsumer2, 2);
+		bufferBuilder.createBufferConsumer();
+		bufferBuilder.createBufferConsumer();
 	}
 
 	@Test