You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:27 UTC

[flink] 02/10: [hotfix][tests][network] Introduce ResultPartitionBuilder for creation of ResultPartition in tests

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit efa7abf2c927958447d6533a3cbd5ccc105a254f
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Fri Apr 26 18:31:33 2019 +0800

    [hotfix][tests][network] Introduce ResultPartitionBuilder for creation of ResultPartition in tests
---
 .../runtime/io/network/NetworkEnvironmentTest.java |   9 +-
 .../io/network/partition/PartitionTestUtils.java   |  43 +++-----
 .../network/partition/ResultPartitionBuilder.java  | 113 +++++++++++++++++++++
 .../partition/consumer/LocalInputChannelTest.java  |  32 ++----
 .../StreamNetworkBenchmarkEnvironment.java         |  22 ++--
 5 files changed, 148 insertions(+), 71 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index bcb4d04..a610838 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
-import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -272,8 +272,11 @@ public class NetworkEnvironmentTest {
 	 */
 	private static ResultPartition createResultPartition(
 			final ResultPartitionType partitionType, final int channels) {
-
-		return PartitionTestUtils.createPartition(partitionType, channels);
+		return new ResultPartitionBuilder()
+			.setResultPartitionType(partitionType)
+			.setNumberOfSubpartitions(channels)
+			.setNumTargetKeyGroups(channels)
+			.build();
 	}
 
 	/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 3391030..b52bffe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.io.disk.iomanager.NoOpIOManager;
-import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
-
 /**
  * This class should consolidate all mocking logic for ResultPartitions.
  * While using Mockito internally (for now), the use of Mockito should not
@@ -34,41 +30,26 @@ public class PartitionTestUtils {
 	}
 
 	public static ResultPartition createPartition(ResultPartitionType type) {
-		return createPartition(
-				new NoOpResultPartitionConsumableNotifier(),
-				type,
-				false);
-	}
-
-	public static ResultPartition createPartition(ResultPartitionType type, int numChannels) {
-		return createPartition(new NoOpResultPartitionConsumableNotifier(), type, numChannels, false);
+		return new ResultPartitionBuilder().setResultPartitionType(type).build();
 	}
 
 	public static ResultPartition createPartition(
 			ResultPartitionConsumableNotifier notifier,
 			ResultPartitionType type,
 			boolean sendScheduleOrUpdateConsumersMessage) {
-
-		return createPartition(notifier, type, 1, sendScheduleOrUpdateConsumersMessage);
+		return new ResultPartitionBuilder()
+			.setResultPartitionConsumableNotifier(notifier)
+			.setResultPartitionType(type)
+			.setSendScheduleOrUpdateConsumersMessage(sendScheduleOrUpdateConsumersMessage)
+			.build();
 	}
 
 	public static ResultPartition createPartition(
-			ResultPartitionConsumableNotifier notifier,
-			ResultPartitionType type,
-			int numChannels,
-			boolean sendScheduleOrUpdateConsumersMessage) {
-
-		return new ResultPartition(
-				"TestTask",
-				new NoOpTaskActions(),
-				new JobID(),
-				new ResultPartitionID(),
-				type,
-				numChannels,
-				numChannels,
-				new ResultPartitionManager(),
-				notifier,
-				new NoOpIOManager(),
-				sendScheduleOrUpdateConsumersMessage);
+			ResultPartitionType partitionType,
+			int numChannels) {
+		return new ResultPartitionBuilder()
+			.setResultPartitionType(partitionType)
+			.setNumberOfSubpartitions(numChannels)
+			.build();
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
new file mode 100644
index 0000000..3d7dab0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+
+/**
+ * Utility class to encapsulate the logic of building a {@link ResultPartition} instance.
+ */
+public class ResultPartitionBuilder {
+
+	private static final String taskName = "Result Partition";
+
+	private JobID jobId = new JobID();
+
+	private final TaskActions taskActions = new NoOpTaskActions();
+
+	private ResultPartitionID partitionId = new ResultPartitionID();
+
+	private ResultPartitionType partitionType = ResultPartitionType.PIPELINED;
+
+	private int numberOfSubpartitions = 1;
+
+	private int numTargetKeyGroups = 1;
+
+	private ResultPartitionManager partitionManager = new ResultPartitionManager();
+
+	private ResultPartitionConsumableNotifier partitionConsumableNotifier = new NoOpResultPartitionConsumableNotifier();
+
+	private IOManager ioManager = new IOManagerAsync();
+
+	private boolean sendScheduleOrUpdateConsumersMessage = false;
+
+	public ResultPartitionBuilder setJobId(JobID jobId) {
+		this.jobId = jobId;
+		return this;
+	}
+
+	public ResultPartitionBuilder setResultPartitionId(ResultPartitionID partitionId) {
+		this.partitionId = partitionId;
+		return this;
+	}
+
+	public ResultPartitionBuilder setResultPartitionType(ResultPartitionType partitionType) {
+		this.partitionType = partitionType;
+		return this;
+	}
+
+	public ResultPartitionBuilder setNumberOfSubpartitions(int numberOfSubpartitions) {
+		this.numberOfSubpartitions = numberOfSubpartitions;
+		return this;
+	}
+
+	public ResultPartitionBuilder setNumTargetKeyGroups(int numTargetKeyGroups) {
+		this.numTargetKeyGroups = numTargetKeyGroups;
+		return this;
+	}
+
+	public ResultPartitionBuilder setResultPartitionManager(ResultPartitionManager partitionManager) {
+		this.partitionManager = partitionManager;
+		return this;
+	}
+
+	ResultPartitionBuilder setResultPartitionConsumableNotifier(ResultPartitionConsumableNotifier notifier) {
+		this.partitionConsumableNotifier = notifier;
+		return this;
+	}
+
+	public ResultPartitionBuilder setIOManager(IOManager ioManager) {
+		this.ioManager = ioManager;
+		return this;
+	}
+
+	public ResultPartitionBuilder setSendScheduleOrUpdateConsumersMessage(boolean sendScheduleOrUpdateConsumersMessage) {
+		this.sendScheduleOrUpdateConsumersMessage = sendScheduleOrUpdateConsumersMessage;
+		return this;
+	}
+
+	public ResultPartition build() {
+		return new ResultPartition(
+			taskName,
+			taskActions,
+			jobId,
+			partitionId,
+			partitionType,
+			numberOfSubpartitions,
+			numTargetKeyGroups,
+			partitionManager,
+			partitionConsumableNotifier,
+			ioManager,
+			sendScheduleOrUpdateConsumersMessage);
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 8d50a39..4654998 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -18,10 +18,8 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -29,19 +27,16 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
-import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 import org.apache.flink.util.function.CheckedSupplier;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -107,12 +102,6 @@ public class LocalInputChannelTest {
 			(parallelism * producerBufferPoolSize) + (parallelism * parallelism),
 			TestBufferFactory.BUFFER_SIZE);
 
-		final ResultPartitionConsumableNotifier partitionConsumableNotifier = new NoOpResultPartitionConsumableNotifier();
-
-		final IOManager ioManager = mock(IOManager.class);
-
-		final JobID jobId = new JobID();
-
 		final ResultPartitionManager partitionManager = new ResultPartitionManager();
 
 		final ResultPartitionID[] partitionIds = new ResultPartitionID[parallelism];
@@ -122,18 +111,13 @@ public class LocalInputChannelTest {
 		for (int i = 0; i < parallelism; i++) {
 			partitionIds[i] = new ResultPartitionID();
 
-			final ResultPartition partition = new ResultPartition(
-				"Test Name",
-				new NoOpTaskActions(),
-				jobId,
-				partitionIds[i],
-				ResultPartitionType.PIPELINED,
-				parallelism,
-				parallelism,
-				partitionManager,
-				partitionConsumableNotifier,
-				ioManager,
-				true);
+			final ResultPartition partition = new ResultPartitionBuilder()
+				.setResultPartitionId(partitionIds[i])
+				.setNumberOfSubpartitions(parallelism)
+				.setNumTargetKeyGroups(parallelism)
+				.setResultPartitionManager(partitionManager)
+				.setSendScheduleOrUpdateConsumersMessage(true)
+				.build();
 
 			// Create a buffer pool for this partition
 			partition.registerBufferPool(
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
index 952b4bb..9dd43fb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkBenchmarkEnvironment.java
@@ -38,8 +38,8 @@ import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
-import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -211,18 +211,14 @@ public class StreamNetworkBenchmarkEnvironment<T extends IOReadableWritable> {
 			NetworkEnvironment environment,
 			int channels) throws Exception {
 
-		ResultPartition resultPartition = new ResultPartition(
-			"sender task",
-			new NoOpTaskActions(),
-			jobId,
-			partitionId,
-			ResultPartitionType.PIPELINED_BOUNDED,
-			channels,
-			1,
-			environment.getResultPartitionManager(),
-			new NoOpResultPartitionConsumableNotifier(),
-			ioManager,
-			false);
+		ResultPartition resultPartition = new ResultPartitionBuilder()
+			.setJobId(jobId)
+			.setResultPartitionId(partitionId)
+			.setResultPartitionType(ResultPartitionType.PIPELINED_BOUNDED)
+			.setNumberOfSubpartitions(channels)
+			.setResultPartitionManager(environment.getResultPartitionManager())
+			.setIOManager(ioManager)
+			.build();
 
 		environment.setupPartition(resultPartition);