You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2020/09/24 16:05:17 UTC

[flink] 06/07: [FLINK-19302][network] Fix flushing BoundedBlockingResultPartition

This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8ec4f1d8d2c3b2f1273d529cd67513c2f68b3656
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Thu Sep 3 13:35:10 2020 +0800

    [FLINK-19302][network] Fix flushing BoundedBlockingResultPartition
    
    Currently, when flushing the BoundedBlockingSubpartition, the unfinished BufferConsumer will be closed and recycled, however the corresponding BufferBuilder is not finished and the writer can keep coping records to it which can lead to loss of data. This patch fix the issue by finishing the corresponding BufferBuilders first when flushing a BoundedBlockingResultPartition.
---
 .../partition/BoundedBlockingResultPartition.java  | 10 ++++++
 .../partition/BufferWritingResultPartition.java    | 24 ++++++++-----
 .../partition/PipelinedResultPartition.java        | 10 ++++++
 .../io/network/partition/ResultPartitionTest.java  | 41 +++++++++++++++++++++-
 4 files changed, 76 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
index a16cfe2..b98b568 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
@@ -63,6 +63,16 @@ public class BoundedBlockingResultPartition extends BufferWritingResultPartition
 			bufferPoolFactory);
 	}
 
+	@Override
+	public void flush(int targetSubpartition) {
+		flushSubpartition(targetSubpartition, true);
+	}
+
+	@Override
+	public void flushAll() {
+		flushAllSubpartitions(true);
+	}
+
 	private static ResultPartitionType checkResultPartitionType(ResultPartitionType type) {
 		checkArgument(type == ResultPartitionType.BLOCKING || type == ResultPartitionType.BLOCKING_PERSISTENT);
 		return type;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
index bd9e8ad..dac509e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -51,7 +51,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * <p>To avoid confusion: On the read side, all subpartitions return buffers (and backlog) to be
  * transported through the network.
  */
-public class BufferWritingResultPartition extends ResultPartition {
+public abstract class BufferWritingResultPartition extends ResultPartition {
 
 	/** The subpartitions of this partition. At least one. */
 	protected final ResultSubpartition[] subpartitions;
@@ -105,18 +105,26 @@ public class BufferWritingResultPartition extends ResultPartition {
 		return subpartitions[targetSubpartition].unsynchronizedGetNumberOfQueuedBuffers();
 	}
 
-	@Override
-	public void flushAll() {
-		for (ResultSubpartition subpartition : subpartitions) {
-			subpartition.flush();
+	protected void flushSubpartition(int targetSubpartition, boolean finishProducers) {
+		if (finishProducers) {
+			finishBroadcastBufferBuilder();
+			finishSubpartitionBufferBuilder(targetSubpartition);
 		}
-	}
 
-	@Override
-	public void flush(int targetSubpartition) {
 		subpartitions[targetSubpartition].flush();
 	}
 
+	protected void flushAllSubpartitions(boolean finishProducers) {
+		if (finishProducers) {
+			finishBroadcastBufferBuilder();
+			finishSubpartitionBufferBuilders();
+		}
+
+		for (ResultSubpartition subpartition : subpartitions) {
+			subpartition.flush();
+		}
+	}
+
 	public void emitRecord(ByteBuffer record, int targetSubpartition) throws IOException {
 		do {
 			final BufferBuilder bufferBuilder = getSubpartitionBufferBuilder(targetSubpartition);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
index a1888c1..236dde0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
@@ -147,6 +147,16 @@ public class PipelinedResultPartition extends BufferWritingResultPartition
 	}
 
 	@Override
+	public void flushAll() {
+		flushAllSubpartitions(false);
+	}
+
+	@Override
+	public void flush(int targetSubpartition) {
+		flushSubpartition(targetSubpartition, false);
+	}
+
+	@Override
 	@SuppressWarnings("FieldAccessNotGuarded")
 	public String toString() {
 		return "PipelinedResultPartition " + partitionId.toString() + " [" + partitionType + ", "
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index d221e99..b261b18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -449,7 +449,7 @@ public class ResultPartitionTest {
 		NettyShuffleEnvironment network = new NettyShuffleEnvironmentBuilder()
 			.setNumNetworkBuffers(10)
 			.setBufferSize(bufferSize).build();
-		ResultPartition resultPartition = createPartition(network, fileChannelManager, partitionType, 1);
+		ResultPartition resultPartition = createPartition(network, fileChannelManager, partitionType, 2);
 		resultPartition.setup();
 		return (BufferWritingResultPartition) resultPartition;
 	}
@@ -611,6 +611,45 @@ public class ResultPartitionTest {
 		assertNotNull(readView.getNextBuffer().buffer());
 	}
 
+	@Test
+	public void testFlushBoundedBlockingResultPartition() throws IOException {
+		int value = 1024;
+		ResultPartition partition = createResultPartition(ResultPartitionType.BLOCKING);
+
+		ByteBuffer record = ByteBuffer.allocate(4);
+		record.putInt(value);
+
+		record.rewind();
+		partition.emitRecord(record, 0);
+		partition.flush(0);
+
+		record.rewind();
+		partition.emitRecord(record, 0);
+
+		record.rewind();
+		partition.broadcastRecord(record);
+		partition.flushAll();
+
+		record.rewind();
+		partition.broadcastRecord(record);
+		partition.finish();
+		record.rewind();
+
+		ResultSubpartitionView readView1 = partition.createSubpartitionView(0, new NoOpBufferAvailablityListener());
+		for (int i = 0; i < 4; ++i) {
+			assertEquals(record, readView1.getNextBuffer().buffer().getNioBufferReadable());
+		}
+		assertFalse(readView1.getNextBuffer().buffer().isBuffer());
+		assertNull(readView1.getNextBuffer());
+
+		ResultSubpartitionView readView2 = partition.createSubpartitionView(1, new NoOpBufferAvailablityListener());
+		for (int i = 0; i < 2; ++i) {
+			assertEquals(record, readView2.getNextBuffer().buffer().getNioBufferReadable());
+		}
+		assertFalse(readView2.getNextBuffer().buffer().isBuffer());
+		assertNull(readView2.getNextBuffer());
+	}
+
 	/**
 	 * The {@link ChannelStateReader} instance for restoring the specific number of states.
 	 */