You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:27 UTC
[flink] 02/10: [hotfix][tests][network] Introduce
ResultPartitionBuilder for creation of ResultPartition in tests
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit efa7abf2c927958447d6533a3cbd5ccc105a254f
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Fri Apr 26 18:31:33 2019 +0800
[hotfix][tests][network] Introduce ResultPartitionBuilder for creation of ResultPartition in tests
---
.../runtime/io/network/NetworkEnvironmentTest.java | 9 +-
.../io/network/partition/PartitionTestUtils.java | 43 +++-----
.../network/partition/ResultPartitionBuilder.java | 113 +++++++++++++++++++++
.../partition/consumer/LocalInputChannelTest.java | 32 ++----
.../StreamNetworkBenchmarkEnvironment.java | 22 ++--
5 files changed, 148 insertions(+), 71 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index bcb4d04..a610838 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.io.network;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
-import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -272,8 +272,11 @@ public class NetworkEnvironmentTest {
*/
private static ResultPartition createResultPartition(
final ResultPartitionType partitionType, final int channels) {
-
- return PartitionTestUtils.createPartition(partitionType, channels);
+ return new ResultPartitionBuilder()
+ .setResultPartitionType(partitionType)
+ .setNumberOfSubpartitions(channels)
+ .setNumTargetKeyGroups(channels)
+ .build();
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 3391030..b52bffe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -18,10 +18,6 @@
package org.apache.flink.runtime.io.network.partition;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.io.disk.iomanager.NoOpIOManager;
-import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
-
/**
* This class should consolidate all mocking logic for ResultPartitions.
* While using Mockito internally (for now), the use of Mockito should not
@@ -34,41 +30,26 @@ public class PartitionTestUtils {
}
public static ResultPartition createPartition(ResultPartitionType type) {
- return createPartition(
- new NoOpResultPartitionConsumableNotifier(),
- type,
- false);
- }
-
- public static ResultPartition createPartition(ResultPartitionType type, int numChannels) {
- return createPartition(new NoOpResultPartitionConsumableNotifier(), type, numChannels, false);
+ return new ResultPartitionBuilder().setResultPartitionType(type).build();
}
public static ResultPartition createPartition(
ResultPartitionConsumableNotifier notifier,
ResultPartitionType type,
boolean sendScheduleOrUpdateConsumersMessage) {
-
- return createPartition(notifier, type, 1, sendScheduleOrUpdateConsumersMessage);
+ return new ResultPartitionBuilder()
+ .setResultPartitionConsumableNotifier(notifier)
+ .setResultPartitionType(type)
+ .setSendScheduleOrUpdateConsumersMessage(sendScheduleOrUpdateConsumersMessage)
+ .build();
}
public static ResultPartition createPartition(
- ResultPartitionConsumableNotifier notifier,
- ResultPartitionType type,
- int numChannels,
- boolean sendScheduleOrUpdateConsumersMessage) {
-
- return new ResultPartition(
- "TestTask",
- new NoOpTaskActions(),
- new JobID(),
- new ResultPartitionID(),
- type,
- numChannels,
- numChannels,
- new ResultPartitionManager(),
- notifier,
- new NoOpIOManager(),
- sendScheduleOrUpdateConsumersMessage);
+ ResultPartitionType partitionType,
+ int numChannels) {
+ return new ResultPartitionBuilder()
+ .setResultPartitionType(partitionType)
+ .setNumberOfSubpartitions(numChannels)
+ .build();
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
new file mode 100644
index 0000000..3d7dab0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+
+/**
+ * Utility class to encapsulate the logic of building a {@link ResultPartition} instance.
+ */
+public class ResultPartitionBuilder {
+
+ private static final String taskName = "Result Partition";
+
+ private JobID jobId = new JobID();
+
+ private final TaskActions taskActions = new NoOpTaskActions();
+
+ private ResultPartitionID partitionId = new ResultPartitionID();
+
+ private ResultPartitionType partitionType = ResultPartitionType.PIPELINED;
+
+ private int numberOfSubpartitions = 1;
+
+ private int numTargetKeyGroups = 1;
+
+ private ResultPartitionManager partitionManager = new ResultPartitionManager();
+
+ private ResultPartitionConsumableNotifier partitionConsumableNotifier = new NoOpResultPartitionConsumableNotifier();
+
+ private IOManager ioManager = new IOManagerAsync();
+
+ private boolean sendScheduleOrUpdateConsumersMessage = false;
+
+ public ResultPartitionBuilder setJobId(JobID jobId) {
+ this.jobId = jobId;
+ return this;
+ }
+
+ public ResultPartitionBuilder setResultPartitionId(ResultPartitionID partitionId) {
+ this.partitionId = partitionId;
+ return this;
+ }
+
+ public ResultPartitionBuilder setResultPartitionType(ResultPartitionType partitionType) {
+ this.partitionType = partitionType;
+ return this;
+ }
+
+ public ResultPartitionBuilder setNumberOfSubpartitions(int numberOfSubpartitions) {
+ this.numberOfSubpartitions = numberOfSubpartitions;
+ return this;
+ }
+
+ public ResultPartitionBuilder setNumTargetKeyGroups(int numTargetKeyGroups) {
+ this.numTargetKeyGroups = numTargetKeyGroups;
+ return this;
+ }
+
+ public ResultPartitionBuilder setResultPartitionManager(ResultPartitionManager partitionManager) {
+ this.partitionManager = partitionManager;
+ return this;
+ }
+
+ ResultPartitionBuilder setResultPartitionConsumableNotifier(ResultPartitionConsumableNotifier notifier) {
+ this.partitionConsumableNotifier = notifier;
+ return this;
+ }
+
+ public ResultPartitionBuilder setIOManager(IOManager ioManager) {
+ this.ioManager = ioManager;
+ return this;
+ }
+
+ public ResultPartitionBuilder setSendScheduleOrUpdateConsumersMessage(boolean sendScheduleOrUpdateConsumersMessage) {
+ this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
+ return this;
+ }
+
+ public ResultPartition build() {
+ return new ResultPartition(
+ taskName,
+ taskActions,
+ jobId,
+ partitionId,
+ partitionType,
+ numberOfSubpartitions,
+ numTargetKeyGroups,
+ partitionManager,
+ partitionConsumableNotifier,
+ ioManager,
+ sendScheduleOrUpdateConsumersMessage);
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 8d50a39..4654998 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -18,10 +18,8 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -29,19 +27,16 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
-import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -107,12 +102,6 @@ public class LocalInputChannelTest {
(parallelism * producerBufferPoolSize) + (parallelism * parallelism),
TestBufferFactory.BUFFER_SIZE);
- final ResultPartitionConsumableNotifier partitionConsumableNotifier = new NoOpResultPartitionConsumableNotifier();
-
- final IOManager ioManager = mock(IOManager.class);
-
- final JobID jobId = new JobID();
-
final ResultPartitionManager partitionManager = new ResultPartitionManager();
final ResultPartitionID[] partitionIds = new ResultPartitionID[parallelism];
@@ -122,18 +111,13 @@ public class LocalInputChannelTest {
for (int i = 0; i < parallelism; i++) {
partitionIds[i] = new ResultPartitionID();
- final ResultPartition partition = new ResultPartition(
- "Test Name",
- new NoOpTaskActions(),
- jobId,
- partitionIds[i],
- ResultPartitionType.PIPELINED,
- parallelism,
- parallelism,
- partitionManager,
- partitionConsumableNotifier,
- ioManager,
- true);
+ final ResultPartition partition = new ResultPartitionBuilder()
+ .setResultPartitionId(partitionIds[i])
+ .setNumberOfSubpartitions(parallelism)
+ .setNumTargetKeyGroups(parallelism)
+ .setResultPartitionManager(partitionManager)
+ .setSendScheduleOrUpdateConsumersMessage(true)
+ .build();
// Create a buffer pool for this partition
partition.registerBufferPool(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 952b4bb..9dd43fb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -38,8 +38,8 @@ import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
-import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -211,18 +211,14 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
NetworkEnvironment environment,
int channels) throws Exception {
- ResultPartition resultPartition = new ResultPartition(
- "sender task",
- new NoOpTaskActions(),
- jobId,
- partitionId,
- ResultPartitionType.PIPELINED_BOUNDED,
- channels,
- 1,
- environment.getResultPartitionManager(),
- new NoOpResultPartitionConsumableNotifier(),
- ioManager,
- false);
+ ResultPartition resultPartition = new ResultPartitionBuilder()
+ .setJobId(jobId)
+ .setResultPartitionId(partitionId)
+ .setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+ .setNumberOfSubpartitions(channels)
+ .setResultPartitionManager(environment.getResultPartitionManager())
+ .setIOManager(ioManager)
+ .build();
environment.setupPartition(resultPartition);