You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/31 20:05:37 UTC

[flink] branch master updated: [FLINK-12564][network] Remove ResultPartitionWriter#getBufferProvider()

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c53c446  [FLINK-12564][network] Remove ResultPartitionWriter#getBufferProvider()
c53c446 is described below

commit c53c446486d58e3db149a9ea6fe1984227e415b2
Author: zhijiang <wa...@aliyun.com>
AuthorDate: Sat Jun 1 04:05:25 2019 +0800

    [FLINK-12564][network] Remove ResultPartitionWriter#getBufferProvider()
    
    * [FLINK-12564][network] Refactor the method of getBufferProvider to getBufferBuilder in ResultPartitionWriter
    
    ResultPartitionWriter#getBufferProvider seems not very general for all the writer implementations. The key point is to request a BufferBuilder
    from the BufferProvider, so this method is refactored into getBufferBuilder directly. Then the internal components of ResultPartitionWriter instance
    would not be exposed to outside.
    
    * [fixup] Remove getBufferProvider from ResultPartition
---
 .../runtime/io/network/api/writer/RecordWriter.java  |  2 +-
 .../io/network/api/writer/ResultPartitionWriter.java |  9 ++++++---
 .../io/network/partition/ResultPartition.java        | 14 ++++++++------
 .../AbstractCollectingResultPartitionWriter.java     | 11 ++++++-----
 .../io/network/api/writer/RecordWriterTest.java      | 20 ++++++++++----------
 .../partition/PartialConsumePipelinedResultTest.java |  2 +-
 .../partition/consumer/LocalInputChannelTest.java    |  2 +-
 7 files changed, 33 insertions(+), 27 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 1743576..cc40df0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -253,7 +253,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 	private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
 		checkState(!bufferBuilders[targetChannel].isPresent() || bufferBuilders[targetChannel].get().isFinished());
 
-		BufferBuilder bufferBuilder = targetPartition.getBufferProvider().requestBufferBuilderBlocking();
+		BufferBuilder bufferBuilder = targetPartition.getBufferBuilder();
 		bufferBuilders[targetChannel] = Optional.of(bufferBuilder);
 		targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
 		return bufferBuilder;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 153b880..cc1e49a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.io.network.api.writer;
 
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 import javax.annotation.Nullable;
@@ -36,8 +36,6 @@ public interface ResultPartitionWriter extends AutoCloseable {
 	 */
 	void setup() throws IOException;
 
-	BufferProvider getBufferProvider();
-
 	ResultPartitionID getPartitionId();
 
 	int getNumberOfSubpartitions();
@@ -45,6 +43,11 @@ public interface ResultPartitionWriter extends AutoCloseable {
 	int getNumTargetKeyGroups();
 
 	/**
+	 * Requests a {@link BufferBuilder} from this partition for writing data.
+	 */
+	BufferBuilder getBufferBuilder() throws IOException, InterruptedException;
+
+	/**
 	 * Adds the bufferConsumer to the subpartition with the given index.
 	 *
 	 * <p>For PIPELINED {@link org.apache.flink.runtime.io.network.partition.ResultPartitionType}s,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 15f15e9..fef0278 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -22,10 +22,10 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -188,11 +188,6 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 		return subpartitions.length;
 	}
 
-	@Override
-	public BufferProvider getBufferProvider() {
-		return bufferPool;
-	}
-
 	public BufferPool getBufferPool() {
 		return bufferPool;
 	}
@@ -219,6 +214,13 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
 	// ------------------------------------------------------------------------
 
 	@Override
+	public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {
+		checkInProduceState();
+
+		return bufferPool.requestBufferBuilderBlocking();
+	}
+
+	@Override
 	public void addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
 		checkNotNull(bufferConsumer);
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 8ae8f5e..8633fe3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -49,11 +50,6 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP
 	}
 
 	@Override
-	public BufferProvider getBufferProvider() {
-		return bufferProvider;
-	}
-
-	@Override
 	public ResultPartitionID getPartitionId() {
 		return new ResultPartitionID();
 	}
@@ -69,6 +65,11 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP
 	}
 
 	@Override
+	public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {
+		return bufferProvider.requestBufferBuilderBlocking();
+	}
+
+	@Override
 	public synchronized void addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
 		checkState(targetChannel < getNumberOfSubpartitions());
 		bufferConsumers.add(bufferConsumer);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 35487b8..f8c6fdd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -480,11 +480,6 @@ public class RecordWriterTest {
 		}
 
 		@Override
-		public BufferProvider getBufferProvider() {
-			return bufferProvider;
-		}
-
-		@Override
 		public ResultPartitionID getPartitionId() {
 			return partitionId;
 		}
@@ -500,6 +495,11 @@ public class RecordWriterTest {
 		}
 
 		@Override
+		public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {
+			return bufferProvider.requestBufferBuilderBlocking();
+		}
+
+		@Override
 		public void addBufferConsumer(BufferConsumer buffer, int targetChannel) throws IOException {
 			queues[targetChannel].add(buffer);
 		}
@@ -555,11 +555,6 @@ public class RecordWriterTest {
 		}
 
 		@Override
-		public BufferProvider getBufferProvider() {
-			return bufferProvider;
-		}
-
-		@Override
 		public ResultPartitionID getPartitionId() {
 			return partitionId;
 		}
@@ -575,6 +570,11 @@ public class RecordWriterTest {
 		}
 
 		@Override
+		public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {
+			return bufferProvider.requestBufferBuilderBlocking();
+		}
+
+		@Override
 		public void addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
 			bufferConsumer.close();
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 7cb1abd..004ad08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -119,7 +119,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
 			final ResultPartitionWriter writer = getEnvironment().getWriter(0);
 
 			for (int i = 0; i < 8; i++) {
-				final BufferBuilder bufferBuilder = writer.getBufferProvider().requestBufferBuilderBlocking();
+				final BufferBuilder bufferBuilder = writer.getBufferBuilder();
 				writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
 				Thread.sleep(50);
 				bufferBuilder.finish();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index a3bc696..74c4968 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -131,7 +131,7 @@ public class LocalInputChannelTest {
 				false,
 				new TestPartitionProducerBufferSource(
 					parallelism,
-					partition.getBufferProvider(),
+					partition.getBufferPool(),
 					numberOfBuffersPerChannel)
 			);
 		}