You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ar...@apache.org on 2020/09/24 16:05:14 UTC
[flink] 03/07: [FLINK-19312][network] Introduce
BufferWritingResultPartition which wraps the ResultSubpartition related
logic
This is an automated email from the ASF dual-hosted git repository.
arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 88a07a9eb65514cf1874a27afc72bd430d87d11f
Author: kevin.cyj <ke...@alibaba-inc.com>
AuthorDate: Mon Aug 31 15:30:20 2020 +0800
[FLINK-19312][network] Introduce BufferWritingResultPartition which wraps the ResultSubpartition related logic
In the current abstraction, buffers are written to and read from ResultSubpartitions, which is a hash-style data writing and reading implementation. This is in contrast to implementations where records are appended to a joint structure, from which the data is drawn after the write phase is finished, for example the sort-based partitioning which clusters data belonging to different channels by sorting channel index. In the future, sort-merge based ResultPartitionWriter will be implemen [...]
---
.../io/network/metrics/ResultPartitionMetrics.java | 32 ++--
.../partition/BoundedBlockingResultPartition.java | 2 +-
.../partition/BufferWritingResultPartition.java | 176 +++++++++++++++++++++
.../partition/PipelinedResultPartition.java | 6 +-
.../io/network/partition/ResultPartition.java | 121 +++-----------
...oundedBlockingSubpartitionAvailabilityTest.java | 2 +-
.../partition/FileChannelBoundedDataTest.java | 2 +-
.../network/partition/InputGateFairnessTest.java | 12 +-
.../partition/ResultPartitionFactoryTest.java | 4 +-
.../io/network/partition/ResultPartitionTest.java | 2 +-
.../partition/consumer/LocalInputChannelTest.java | 5 +-
11 files changed, 227 insertions(+), 137 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java
index 2171ff3..e3ba640 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/ResultPartitionMetrics.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.io.network.metrics;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -49,13 +48,7 @@ public class ResultPartitionMetrics {
* @return total number of queued buffers
*/
long refreshAndGetTotal() {
- long total = 0;
-
- for (ResultSubpartition part : partition.getAllPartitions()) {
- total += part.unsynchronizedGetNumberOfQueuedBuffers();
- }
-
- return total;
+ return partition.getNumberOfQueuedBuffers();
}
/**
@@ -66,15 +59,15 @@ public class ResultPartitionMetrics {
*/
int refreshAndGetMin() {
int min = Integer.MAX_VALUE;
+ int numSubpartitions = partition.getNumberOfSubpartitions();
- ResultSubpartition[] allPartitions = partition.getAllPartitions();
- if (allPartitions.length == 0) {
+ if (numSubpartitions == 0) {
// meaningful value when no channels exist:
return 0;
}
- for (ResultSubpartition part : allPartitions) {
- int size = part.unsynchronizedGetNumberOfQueuedBuffers();
+ for (int targetSubpartition = 0; targetSubpartition < numSubpartitions; ++targetSubpartition) {
+ int size = partition.getNumberOfQueuedBuffers(targetSubpartition);
min = Math.min(min, size);
}
@@ -89,9 +82,10 @@ public class ResultPartitionMetrics {
*/
int refreshAndGetMax() {
int max = 0;
+ int numSubpartitions = partition.getNumberOfSubpartitions();
- for (ResultSubpartition part : partition.getAllPartitions()) {
- int size = part.unsynchronizedGetNumberOfQueuedBuffers();
+ for (int targetSubpartition = 0; targetSubpartition < numSubpartitions; ++targetSubpartition) {
+ int size = partition.getNumberOfQueuedBuffers(targetSubpartition);
max = Math.max(max, size);
}
@@ -105,15 +99,7 @@ public class ResultPartitionMetrics {
* @return average number of queued buffers per sub-partition
*/
float refreshAndGetAvg() {
- long total = 0;
-
- ResultSubpartition[] allPartitions = partition.getAllPartitions();
- for (ResultSubpartition part : allPartitions) {
- int size = part.unsynchronizedGetNumberOfQueuedBuffers();
- total += size;
- }
-
- return total / (float) allPartitions.length;
+ return partition.getNumberOfQueuedBuffers() / (float) partition.getNumberOfSubpartitions();
}
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
index fc440a0..a16cfe2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingResultPartition.java
@@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
* per sub-partition. This implementation hence requires at least as many files (file handles) and
* memory buffers as the parallelism of the target task that the data is shuffled to.
*/
-public class BoundedBlockingResultPartition extends ResultPartition {
+public class BoundedBlockingResultPartition extends BufferWritingResultPartition {
public BoundedBlockingResultPartition(
String owningTaskName,
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
new file mode 100644
index 0000000..437d0a0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
+import org.apache.flink.util.function.FunctionWithException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A {@link ResultPartition} which writes buffers directly to {@link ResultSubpartition}s. This
+ * is in contrast to implementations where records are written to a joint structure, from which
+ * the subpartitions draw the data after the write phase is finished, for example the sort-based
+ * partitioning.
+ *
+ * <p>To avoid confusion: On the read side, all subpartitions return buffers (and backlog) to be
+ * transported through the network.
+ */
+public class BufferWritingResultPartition extends ResultPartition {
+
+ /** The subpartitions of this partition. At least one. */
+ protected final ResultSubpartition[] subpartitions;
+
+ public BufferWritingResultPartition(
+ String owningTaskName,
+ int partitionIndex,
+ ResultPartitionID partitionId,
+ ResultPartitionType partitionType,
+ ResultSubpartition[] subpartitions,
+ int numTargetKeyGroups,
+ ResultPartitionManager partitionManager,
+ @Nullable BufferCompressor bufferCompressor,
+ FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
+
+ super(
+ owningTaskName,
+ partitionIndex,
+ partitionId,
+ partitionType,
+ subpartitions.length,
+ numTargetKeyGroups,
+ partitionManager,
+ bufferCompressor,
+ bufferPoolFactory);
+
+ this.subpartitions = checkNotNull(subpartitions);
+ }
+
+ public int getNumberOfQueuedBuffers() {
+ int totalBuffers = 0;
+
+ for (ResultSubpartition subpartition : subpartitions) {
+ totalBuffers += subpartition.unsynchronizedGetNumberOfQueuedBuffers();
+ }
+
+ return totalBuffers;
+ }
+
+ public int getNumberOfQueuedBuffers(int targetSubpartition) {
+ checkArgument(targetSubpartition >= 0 && targetSubpartition < numSubpartitions);
+ return subpartitions[targetSubpartition].unsynchronizedGetNumberOfQueuedBuffers();
+ }
+
+ @Override
+ public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
+ checkInProduceState();
+
+ return bufferPool.requestBufferBuilderBlocking(targetChannel);
+ }
+
+ @Override
+ public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
+ return bufferPool.requestBufferBuilder(targetChannel);
+ }
+
+ @Override
+ public boolean addBufferConsumer(
+ BufferConsumer bufferConsumer,
+ int subpartitionIndex) throws IOException {
+ checkNotNull(bufferConsumer);
+
+ ResultSubpartition subpartition;
+ try {
+ checkInProduceState();
+ subpartition = subpartitions[subpartitionIndex];
+ }
+ catch (Exception ex) {
+ bufferConsumer.close();
+ throw ex;
+ }
+
+ return subpartition.add(bufferConsumer);
+ }
+
+ @Override
+ public void flushAll() {
+ for (ResultSubpartition subpartition : subpartitions) {
+ subpartition.flush();
+ }
+ }
+
+ @Override
+ public void flush(int targetSubpartition) {
+ subpartitions[targetSubpartition].flush();
+ }
+
+ @Override
+ public ResultSubpartitionView createSubpartitionView(
+ int subpartitionIndex,
+ BufferAvailabilityListener availabilityListener) throws IOException {
+ checkElementIndex(subpartitionIndex, numSubpartitions, "Subpartition not found.");
+ checkState(!isReleased(), "Partition released.");
+
+ ResultSubpartition subpartition = subpartitions[subpartitionIndex];
+ ResultSubpartitionView readView = subpartition.createReadView(availabilityListener);
+
+ LOG.debug("Created {}", readView);
+
+ return readView;
+ }
+
+ @Override
+ public void finish() throws IOException {
+ for (ResultSubpartition subpartition : subpartitions) {
+ subpartition.finish();
+ }
+ super.finish();
+ }
+
+ @Override
+ protected void releaseInternal() {
+ // Release all subpartitions
+ for (ResultSubpartition subpartition : subpartitions) {
+ try {
+ subpartition.release();
+ }
+ // Catch this in order to ensure that release is called on all subpartitions
+ catch (Throwable t) {
+ LOG.error("Error during release of result subpartition: " + t.getMessage(), t);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public ResultSubpartition[] getAllPartitions() {
+ return subpartitions;
+ }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
index dc9da76..a1888c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedResultPartition.java
@@ -47,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
* {@link #onConsumedSubpartition(int)}) then the partition as a whole is disposed and all buffers are
* freed.
*/
-public class PipelinedResultPartition extends ResultPartition
+public class PipelinedResultPartition extends BufferWritingResultPartition
implements CheckpointedResultPartition, ChannelStateHolder {
/** The lock that guard release operations (which can be asynchronously propagated from the
@@ -134,12 +134,12 @@ public class PipelinedResultPartition extends ResultPartition
@Override
public CheckpointedResultSubpartition getCheckpointedSubpartition(int subpartitionIndex) {
- return (CheckpointedResultSubpartition) getAllPartitions()[subpartitionIndex];
+ return (CheckpointedResultSubpartition) subpartitions[subpartitionIndex];
}
@Override
public void readRecoveredState(ChannelStateReader stateReader) throws IOException, InterruptedException {
- for (ResultSubpartition subPar : getAllPartitions()) {
+ for (ResultSubpartition subPar : subpartitions) {
((PipelinedSubpartition) subPar).readRecoveredState(stateReader);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
index 15430b9..02c91c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
@@ -21,9 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
-import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferPoolOwner;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
@@ -43,7 +41,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkElementIndex;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@@ -52,8 +49,8 @@ import static org.apache.flink.util.Preconditions.checkState;
*
* <p>This class is the runtime part of a logical {@link IntermediateResultPartition}. Essentially,
* a result partition is a collection of {@link Buffer} instances. The buffers are organized in one
- * or more {@link ResultSubpartition} instances, which further partition the data depending on the
- * number of consuming tasks and the data {@link DistributionPattern}.
+ * or more {@link ResultSubpartition} instances or in a joint structure which further partition the
+ * data depending on the number of consuming tasks and the data {@link DistributionPattern}.
*
* <p>Tasks, which consume a result partition have to request one of its subpartitions. The request
* happens either remotely (see {@link RemoteInputChannel}) or locally (see {@link LocalInputChannel})
@@ -84,18 +81,17 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
/** Type of this partition. Defines the concrete subpartition implementation to use. */
protected final ResultPartitionType partitionType;
- /** The subpartitions of this partition. At least one. */
- protected final ResultSubpartition[] subpartitions;
-
protected final ResultPartitionManager partitionManager;
+ protected final int numSubpartitions;
+
private final int numTargetKeyGroups;
// - Runtime state --------------------------------------------------------
private final AtomicBoolean isReleased = new AtomicBoolean();
- private BufferPool bufferPool;
+ protected BufferPool bufferPool;
private boolean isFinished;
@@ -112,7 +108,7 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
int partitionIndex,
ResultPartitionID partitionId,
ResultPartitionType partitionType,
- ResultSubpartition[] subpartitions,
+ int numSubpartitions,
int numTargetKeyGroups,
ResultPartitionManager partitionManager,
@Nullable BufferCompressor bufferCompressor,
@@ -123,7 +119,7 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
this.partitionIndex = partitionIndex;
this.partitionId = checkNotNull(partitionId);
this.partitionType = checkNotNull(partitionType);
- this.subpartitions = checkNotNull(subpartitions);
+ this.numSubpartitions = numSubpartitions;
this.numTargetKeyGroups = numTargetKeyGroups;
this.partitionManager = checkNotNull(partitionManager);
this.bufferCompressor = bufferCompressor;
@@ -164,22 +160,22 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
@Override
public int getNumberOfSubpartitions() {
- return subpartitions.length;
+ return numSubpartitions;
}
public BufferPool getBufferPool() {
return bufferPool;
}
- public int getNumberOfQueuedBuffers() {
- int totalBuffers = 0;
-
- for (ResultSubpartition subpartition : subpartitions) {
- totalBuffers += subpartition.unsynchronizedGetNumberOfQueuedBuffers();
- }
+ /**
+ * Returns the total number of queued buffers of all subpartitions.
+ */
+ public abstract int getNumberOfQueuedBuffers();
- return totalBuffers;
- }
+ /**
+ * Returns the number of queued buffers of the given target subpartition.
+ */
+ public abstract int getNumberOfQueuedBuffers(int targetSubpartition);
/**
* Returns the type of this result partition.
@@ -192,48 +188,6 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
// ------------------------------------------------------------------------
- @Override
- public BufferBuilder getBufferBuilder(int targetChannel) throws IOException, InterruptedException {
- checkInProduceState();
-
- return bufferPool.requestBufferBuilderBlocking(targetChannel);
- }
-
- @Override
- public BufferBuilder tryGetBufferBuilder(int targetChannel) throws IOException {
- BufferBuilder bufferBuilder = bufferPool.requestBufferBuilder(targetChannel);
- return bufferBuilder;
- }
-
- @Override
- public boolean addBufferConsumer(BufferConsumer bufferConsumer, int subpartitionIndex) throws IOException {
- checkNotNull(bufferConsumer);
-
- ResultSubpartition subpartition;
- try {
- checkInProduceState();
- subpartition = subpartitions[subpartitionIndex];
- }
- catch (Exception ex) {
- bufferConsumer.close();
- throw ex;
- }
-
- return subpartition.add(bufferConsumer);
- }
-
- @Override
- public void flushAll() {
- for (ResultSubpartition subpartition : subpartitions) {
- subpartition.flush();
- }
- }
-
- @Override
- public void flush(int subpartitionIndex) {
- subpartitions[subpartitionIndex].flush();
- }
-
/**
* Finishes the result partition.
*
@@ -245,10 +199,6 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
public void finish() throws IOException {
checkInProduceState();
- for (ResultSubpartition subpartition : subpartitions) {
- subpartition.finish();
- }
-
isFinished = true;
}
@@ -268,19 +218,15 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
this.cause = cause;
}
- // Release all subpartitions
- for (ResultSubpartition subpartition : subpartitions) {
- try {
- subpartition.release();
- }
- // Catch this in order to ensure that release is called on all subpartitions
- catch (Throwable t) {
- LOG.error("Error during release of result subpartition: " + t.getMessage(), t);
- }
- }
+ releaseInternal();
}
}
+ /**
+ * Releases all produced data including both those stored in memory and persisted on disk.
+ */
+ protected abstract void releaseInternal();
+
@Override
public void close() {
if (bufferPool != null) {
@@ -293,21 +239,6 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
partitionManager.releasePartition(partitionId, throwable);
}
- /**
- * Returns the requested subpartition.
- */
- @Override
- public ResultSubpartitionView createSubpartitionView(int index, BufferAvailabilityListener availabilityListener) throws IOException {
- checkElementIndex(index, subpartitions.length, "Subpartition not found.");
- checkState(!isReleased.get(), "Partition released.");
-
- ResultSubpartitionView readView = subpartitions[index].createReadView(availabilityListener);
-
- LOG.debug("Created {}", readView);
-
- return readView;
- }
-
public Throwable getFailureCause() {
return cause;
}
@@ -346,7 +277,7 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
@Override
public String toString() {
return "ResultPartition " + partitionId.toString() + " [" + partitionType + ", "
- + subpartitions.length + " subpartitions]";
+ + numSubpartitions + " subpartitions]";
}
// ------------------------------------------------------------------------
@@ -364,13 +295,9 @@ public abstract class ResultPartition implements ResultPartitionWriter, BufferPo
this, subpartitionIndex);
}
- public ResultSubpartition[] getAllPartitions() {
- return subpartitions;
- }
-
// ------------------------------------------------------------------------
- private void checkInProduceState() throws IllegalStateException {
+ protected void checkInProduceState() throws IllegalStateException {
checkState(!isFinished, "Partition already finished.");
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
index 8a81dc8..fd1188c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionAvailabilityTest.java
@@ -122,7 +122,7 @@ public class BoundedBlockingSubpartitionAvailabilityTest {
// ------------------------------------------------------------------------
private static ResultSubpartition createPartitionWithData(int numberOfBuffers) throws IOException {
- ResultPartition parent = new ResultPartitionBuilder()
+ BoundedBlockingResultPartition parent = (BoundedBlockingResultPartition) new ResultPartitionBuilder()
.setResultPartitionType(ResultPartitionType.BLOCKING_PERSISTENT)
.setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE)
.setFileChannelManager(new FileChannelManagerImpl(new String[] { TMP_FOLDER.newFolder().toString() }, "data"))
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
index aa209ec..559d13d 100755
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/FileChannelBoundedDataTest.java
@@ -152,7 +152,7 @@ public class FileChannelBoundedDataTest extends BoundedDataTestBase {
}
private static ResultSubpartition createFileBoundedBlockingSubpartition() {
- final ResultPartition resultPartition = new ResultPartitionBuilder()
+ final BoundedBlockingResultPartition resultPartition = (BoundedBlockingResultPartition) new ResultPartitionBuilder()
.setNetworkBufferSize(BUFFER_SIZE)
.setResultPartitionType(ResultPartitionType.BLOCKING)
.setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType.FILE)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index e770a65..b6fadf8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -64,9 +64,9 @@ public class InputGateFairnessTest {
final int numberOfChannels = 37;
final int buffersPerChannel = 27;
- ResultPartition[] resultPartitions = IntStream.range(0, numberOfChannels)
- .mapToObj(i -> new ResultPartitionBuilder().build())
- .toArray(ResultPartition[]::new);
+ PipelinedResultPartition[] resultPartitions = IntStream.range(0, numberOfChannels)
+ .mapToObj(i -> (PipelinedResultPartition) new ResultPartitionBuilder().build())
+ .toArray(PipelinedResultPartition[]::new);
final BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(42);
// ----- create some source channels and fill them with buffers -----
@@ -124,9 +124,9 @@ public class InputGateFairnessTest {
final int numberOfChannels = 37;
final int buffersPerChannel = 27;
- ResultPartition[] resultPartitions = IntStream.range(0, numberOfChannels)
- .mapToObj(i -> new ResultPartitionBuilder().build())
- .toArray(ResultPartition[]::new);
+ PipelinedResultPartition[] resultPartitions = IntStream.range(0, numberOfChannels)
+ .mapToObj(i -> (PipelinedResultPartition) new ResultPartitionBuilder().build())
+ .toArray(PipelinedResultPartition[]::new);
try (BufferConsumer bufferConsumer = createFilledFinishedBufferConsumer(42)) {
// ----- create some source channels and fill them with one buffer each -----
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 61af900..903878b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -61,13 +61,13 @@ public class ResultPartitionFactoryTest extends TestLogger {
@Test
public void testBoundedBlockingSubpartitionsCreated() {
- final ResultPartition resultPartition = createResultPartition(ResultPartitionType.BLOCKING);
+ final BoundedBlockingResultPartition resultPartition = (BoundedBlockingResultPartition) createResultPartition(ResultPartitionType.BLOCKING);
Arrays.stream(resultPartition.subpartitions).forEach(sp -> assertThat(sp, instanceOf(BoundedBlockingSubpartition.class)));
}
@Test
public void testPipelinedSubpartitionsCreated() {
- final ResultPartition resultPartition = createResultPartition(ResultPartitionType.PIPELINED);
+ final PipelinedResultPartition resultPartition = (PipelinedResultPartition) createResultPartition(ResultPartitionType.PIPELINED);
Arrays.stream(resultPartition.subpartitions).forEach(sp -> assertThat(sp, instanceOf(PipelinedSubpartition.class)));
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index 7f419ba..e293422 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -95,7 +95,7 @@ public class ResultPartitionTest {
final int numSubpartitions = 3;
for (int i = 0; i < numPartitions; i++) {
- final ResultPartition partition = new ResultPartitionBuilder()
+ final PipelinedResultPartition partition = (PipelinedResultPartition) new ResultPartitionBuilder()
.setResultPartitionIndex(i)
.setNumberOfSubpartitions(numSubpartitions)
.build();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index d0bb400..6e65ad74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
+import org.apache.flink.runtime.io.network.partition.PipelinedResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -461,7 +462,7 @@ public class LocalInputChannelTest {
public void testCheckpointingInflightData() throws Exception {
SingleInputGate inputGate = new SingleInputGateBuilder().build();
- ResultPartition parent = PartitionTestUtils.createPartition(
+ PipelinedResultPartition parent = (PipelinedResultPartition) PartitionTestUtils.createPartition(
ResultPartitionType.PIPELINED,
NoOpFileChannelManager.INSTANCE);
ResultSubpartition subpartition = parent.getAllPartitions()[0];
@@ -501,7 +502,7 @@ public class LocalInputChannelTest {
private static ResultSubpartitionView createResultSubpartitionView(boolean addBuffer) throws IOException {
int bufferSize = 4096;
- ResultPartition parent = PartitionTestUtils.createPartition(
+ PipelinedResultPartition parent = (PipelinedResultPartition) PartitionTestUtils.createPartition(
ResultPartitionType.PIPELINED,
NoOpFileChannelManager.INSTANCE,
true,