You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/31 20:05:37 UTC
[flink] branch master updated: [FLINK-12564][network] Remove
ResultPartitionWriter#getBufferProvider()
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c53c446 [FLINK-12564][network] Remove ResultPartitionWriter#getBufferProvider()
c53c446 is described below
commit c53c446486d58e3db149a9ea6fe1984227e415b2
Author: zhijiang <wa...@aliyun.com>
AuthorDate: Sat Jun 1 04:05:25 2019 +0800
[FLINK-12564][network] Remove ResultPartitionWriter#getBufferProvider()
* [FLINK-12564][network] Refactor the method of getBufferProvider to getBufferBuilder in ResultPartitionWriter
ResultPartitionWriter#getBufferProvider seems not very general for all the writer implementations. The key point is to request a BufferBuilder
from the BufferProvider, so this method is refactored into getBufferBuilder directly. Then the internal components of ResultPartitionWriter instance
would not be exposed to outside.
* [fixup] Remove getBufferProvider from ResultPartition
---
.../runtime/io/network/api/writer/RecordWriter.java | 2 +-
.../io/network/api/writer/ResultPartitionWriter.java | 9 ++++++---
.../io/network/partition/ResultPartition.java | 14 ++++++++------
.../AbstractCollectingResultPartitionWriter.java | 11 ++++++-----
.../io/network/api/writer/RecordWriterTest.java | 20 ++++++++++----------
.../partition/PartialConsumePipelinedResultTest.java | 2 +-
.../partition/consumer/LocalInputChannelTest.java | 2 +-
7 files changed, 33 insertions(+), 27 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 1743576..cc40df0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -253,7 +253,7 @@ public class RecordWriter<T extends IOReadableWritable> {
private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
checkState(!bufferBuilders[targetChannel].isPresent() || bufferBuilders[targetChannel].get().isFinished());
- BufferBuilder bufferBuilder = targetPartition.getBufferProvider().requestBufferBuilderBlocking();
+ BufferBuilder bufferBuilder = targetPartition.getBufferBuilder();
bufferBuilders[targetChannel] = Optional.of(bufferBuilder);
targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
return bufferBuilder;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
index 153b880..cc1e49a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/ResultPartitionWriter.java
@@ -18,8 +18,8 @@
package org.apache.flink.runtime.io.network.api.writer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import javax.annotation.Nullable;
@@ -36,8 +36,6 @@ public interface ResultPartitionWriter extends AutoCloseable {
*/
void setup() throws IOException;
- BufferProvider getBufferProvider();
-
ResultPartitionID getPartitionId();
int getNumberOfSubpartitions();
@@ -45,6 +43,11 @@ public interface ResultPartitionWriter extends AutoCloseable {
int getNumTargetKeyGroups();
/**
+ * Requests a {@link BufferBuilder} from this partition for writing data.
+ */
+ BufferBuilder getBufferBuilder() throws IOException, InterruptedException;
+
+ /**
* Adds the bufferConsumer to the subpartition with the given index.
*
* <p>For PIPELINED {@link org.apache.flink.runtime.io.network.partition.ResultPartitionType}s,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 15f15e9..fef0278 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -22,10 +22,10 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -188,11 +188,6 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
return subpartitions.length;
}
- @Override
- public BufferProvider getBufferProvider() {
- return bufferPool;
- }
-
public BufferPool getBufferPool() {
return bufferPool;
}
@@ -219,6 +214,13 @@ public class ResultPartition implements ResultPartitionWriter, BufferPoolOwner {
// ------------------------------------------------------------------------
@Override
+ public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {
+ checkInProduceState();
+
+ return bufferPool.requestBufferBuilderBlocking();
+ }
+
+ @Override
public void addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
checkNotNull(bufferConsumer);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
index 8ae8f5e..8633fe3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/AbstractCollectingResultPartitionWriter.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -49,11 +50,6 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP
}
@Override
- public BufferProvider getBufferProvider() {
- return bufferProvider;
- }
-
- @Override
public ResultPartitionID getPartitionId() {
return new ResultPartitionID();
}
@@ -69,6 +65,11 @@ public abstract class AbstractCollectingResultPartitionWriter implements ResultP
}
@Override
+ public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {
+ return bufferProvider.requestBufferBuilderBlocking();
+ }
+
+ @Override
public synchronized void addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
checkState(targetChannel < getNumberOfSubpartitions());
bufferConsumers.add(bufferConsumer);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
index 35487b8..f8c6fdd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java
@@ -480,11 +480,6 @@ public class RecordWriterTest {
}
@Override
- public BufferProvider getBufferProvider() {
- return bufferProvider;
- }
-
- @Override
public ResultPartitionID getPartitionId() {
return partitionId;
}
@@ -500,6 +495,11 @@ public class RecordWriterTest {
}
@Override
+ public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {
+ return bufferProvider.requestBufferBuilderBlocking();
+ }
+
+ @Override
public void addBufferConsumer(BufferConsumer buffer, int targetChannel) throws IOException {
queues[targetChannel].add(buffer);
}
@@ -555,11 +555,6 @@ public class RecordWriterTest {
}
@Override
- public BufferProvider getBufferProvider() {
- return bufferProvider;
- }
-
- @Override
public ResultPartitionID getPartitionId() {
return partitionId;
}
@@ -575,6 +570,11 @@ public class RecordWriterTest {
}
@Override
+ public BufferBuilder getBufferBuilder() throws IOException, InterruptedException {
+ return bufferProvider.requestBufferBuilderBlocking();
+ }
+
+ @Override
public void addBufferConsumer(BufferConsumer bufferConsumer, int targetChannel) throws IOException {
bufferConsumer.close();
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index 7cb1abd..004ad08 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -119,7 +119,7 @@ public class PartialConsumePipelinedResultTest extends TestLogger {
final ResultPartitionWriter writer = getEnvironment().getWriter(0);
for (int i = 0; i < 8; i++) {
- final BufferBuilder bufferBuilder = writer.getBufferProvider().requestBufferBuilderBlocking();
+ final BufferBuilder bufferBuilder = writer.getBufferBuilder();
writer.addBufferConsumer(bufferBuilder.createBufferConsumer(), 0);
Thread.sleep(50);
bufferBuilder.finish();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index a3bc696..74c4968 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -131,7 +131,7 @@ public class LocalInputChannelTest {
false,
new TestPartitionProducerBufferSource(
parallelism,
- partition.getBufferProvider(),
+ partition.getBufferPool(),
numberOfBuffersPerChannel)
);
}