You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:26 UTC

[flink] 01/10: [hotfix][tests][network] Introduce SingleInputGateBuilder for creation of SingleInputGate in tests

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 15811a5d8c7362ac64828b771951dfa8304d98eb
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Fri Apr 26 17:13:08 2019 +0800

    [hotfix][tests][network] Introduce SingleInputGateBuilder for creation of SingleInputGate in tests
---
 .../runtime/io/network/NetworkEnvironmentTest.java |  7 +-
 .../network/partition/InputChannelTestUtils.java   | 24 +------
 .../partition/consumer/InputGateTestBase.java      | 10 +--
 .../partition/consumer/LocalInputChannelTest.java  | 16 ++---
 .../partition/consumer/SingleInputGateBuilder.java | 82 ++++++++++++++++++++++
 5 files changed, 99 insertions(+), 40 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index f20feb7..bcb4d04 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.taskmanager.Task;
 
 import org.junit.Rule;
@@ -287,7 +288,11 @@ public class NetworkEnvironmentTest {
 	 * @return input gate with some fake settings
 	 */
 	private SingleInputGate createSingleInputGate(ResultPartitionType partitionType, int numberOfChannels) {
-		return spy(InputChannelTestUtils.createSingleInputGate(numberOfChannels, partitionType, enableCreditBasedFlowControl));
+		return spy(new SingleInputGateBuilder()
+			.setNumberOfChannels(numberOfChannels)
+			.setResultPartitionType(partitionType)
+			.setIsCreditBased(enableCreditBasedFlowControl)
+			.build());
 	}
 
 	private static void createRemoteInputChannel(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 4feee4e..82b4c5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -28,9 +26,8 @@ import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -75,24 +72,7 @@ public class InputChannelTestUtils {
 	}
 
 	public static SingleInputGate createSingleInputGate(int numberOfChannels) {
-		return createSingleInputGate(numberOfChannels, ResultPartitionType.PIPELINED, true);
-	}
-
-	public static SingleInputGate createSingleInputGate(
-		int numberOfChannels,
-		ResultPartitionType partitionType,
-		boolean isCreditBased) {
-
-		return new SingleInputGate(
-			"InputGate",
-			new JobID(),
-			new IntermediateDataSetID(),
-			partitionType,
-			0,
-			numberOfChannels,
-			new NoOpTaskActions(),
-			new SimpleCounter(),
-			isCreditBased);
+		return new SingleInputGateBuilder().setNumberOfChannels(numberOfChannels).build();
 	}
 
 	public static ConnectionManager createDummyConnectionManager() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
index ddf027c..4d254c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
@@ -29,7 +29,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -80,10 +79,11 @@ public abstract class InputGateTestBase {
 
 	protected SingleInputGate createInputGate(
 			int numberOfInputChannels, ResultPartitionType partitionType) {
-		SingleInputGate inputGate = createSingleInputGate(
-			numberOfInputChannels,
-			partitionType,
-			enableCreditBasedFlowControl);
+		SingleInputGate inputGate = new SingleInputGateBuilder()
+			.setNumberOfChannels(numberOfInputChannels)
+			.setResultPartitionType(partitionType)
+			.setIsCreditBased(enableCreditBasedFlowControl)
+			.build();
 
 		assertEquals(partitionType, inputGate.getConsumedPartitionType());
 		return inputGate;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 5e99995..8d50a39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -41,7 +40,6 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
 import org.apache.flink.util.function.CheckedSupplier;
@@ -452,16 +450,10 @@ public class LocalInputChannelTest {
 			checkArgument(numberOfInputChannels >= 1);
 			checkArgument(numberOfExpectedBuffersPerChannel >= 1);
 
-			this.inputGate = new SingleInputGate(
-				"Test Name",
-				new JobID(),
-				new IntermediateDataSetID(),
-				ResultPartitionType.PIPELINED,
-				subpartitionIndex,
-				numberOfInputChannels,
-				new NoOpTaskActions(),
-				new SimpleCounter(),
-				true);
+			this.inputGate = new SingleInputGateBuilder()
+				.setConsumedSubpartitionIndex(subpartitionIndex)
+				.setNumberOfChannels(numberOfInputChannels)
+				.build();
 
 			// Set buffer pool
 			inputGate.setBufferPool(bufferPool);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
new file mode 100644
index 0000000..28394a3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+
+/**
+ * Utility class to encapsulate the logic of building a {@link SingleInputGate} instance.
+ */
+public class SingleInputGateBuilder {
+
+	private final JobID jobId = new JobID();
+
+	private final IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
+
+	private ResultPartitionType partitionType = ResultPartitionType.PIPELINED;
+
+	private int consumedSubpartitionIndex = 0;
+
+	private int numberOfChannels = 1;
+
+	private final TaskActions taskActions = new NoOpTaskActions();
+
+	private final Counter numBytesInCounter = new SimpleCounter();
+
+	private boolean isCreditBased = true;
+
+	public SingleInputGateBuilder setResultPartitionType(ResultPartitionType partitionType) {
+		this.partitionType = partitionType;
+		return this;
+	}
+
+	SingleInputGateBuilder setConsumedSubpartitionIndex(int consumedSubpartitionIndex) {
+		this.consumedSubpartitionIndex = consumedSubpartitionIndex;
+		return this;
+	}
+
+	public SingleInputGateBuilder setNumberOfChannels(int numberOfChannels) {
+		this.numberOfChannels = numberOfChannels;
+		return this;
+	}
+
+	public SingleInputGateBuilder setIsCreditBased(boolean isCreditBased) {
+		this.isCreditBased = isCreditBased;
+		return this;
+	}
+
+	public SingleInputGate build() {
+		return new SingleInputGate(
+			"Single Input Gate",
+			jobId,
+			intermediateDataSetID,
+			partitionType,
+			consumedSubpartitionIndex,
+			numberOfChannels,
+			taskActions,
+			numBytesInCounter,
+			isCreditBased);
+	}
+}