You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:26 UTC
[flink] 01/10: [hotfix][tests][network] Introduce
SingleInputGateBuilder for creation of SingleInputGate in tests
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 15811a5d8c7362ac64828b771951dfa8304d98eb
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Fri Apr 26 17:13:08 2019 +0800
[hotfix][tests][network] Introduce SingleInputGateBuilder for creation of SingleInputGate in tests
---
.../runtime/io/network/NetworkEnvironmentTest.java | 7 +-
.../network/partition/InputChannelTestUtils.java | 24 +------
.../partition/consumer/InputGateTestBase.java | 10 +--
.../partition/consumer/LocalInputChannelTest.java | 16 ++---
.../partition/consumer/SingleInputGateBuilder.java | 82 ++++++++++++++++++++++
5 files changed, 99 insertions(+), 40 deletions(-)
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index f20feb7..bcb4d04 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.taskmanager.Task;
import org.junit.Rule;
@@ -287,7 +288,11 @@ public class NetworkEnvironmentTest {
* @return input gate with some fake settings
*/
private SingleInputGate createSingleInputGate(ResultPartitionType partitionType, int numberOfChannels) {
- return spy(InputChannelTestUtils.createSingleInputGate(numberOfChannels, partitionType, enableCreditBasedFlowControl));
+ return spy(new SingleInputGateBuilder()
+ .setNumberOfChannels(numberOfChannels)
+ .setResultPartitionType(partitionType)
+ .setIsCreditBased(enableCreditBasedFlowControl)
+ .build());
}
private static void createRemoteInputChannel(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 4feee4e..82b4c5b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.io.network.partition;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -28,9 +26,8 @@ import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
-import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -75,24 +72,7 @@ public class InputChannelTestUtils {
}
public static SingleInputGate createSingleInputGate(int numberOfChannels) {
- return createSingleInputGate(numberOfChannels, ResultPartitionType.PIPELINED, true);
- }
-
- public static SingleInputGate createSingleInputGate(
- int numberOfChannels,
- ResultPartitionType partitionType,
- boolean isCreditBased) {
-
- return new SingleInputGate(
- "InputGate",
- new JobID(),
- new IntermediateDataSetID(),
- partitionType,
- 0,
- numberOfChannels,
- new NoOpTaskActions(),
- new SimpleCounter(),
- isCreditBased);
+ return new SingleInputGateBuilder().setNumberOfChannels(numberOfChannels).build();
}
public static ConnectionManager createDummyConnectionManager() throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
index ddf027c..4d254c0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateTestBase.java
@@ -29,7 +29,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -80,10 +79,11 @@ public abstract class InputGateTestBase {
protected SingleInputGate createInputGate(
int numberOfInputChannels, ResultPartitionType partitionType) {
- SingleInputGate inputGate = createSingleInputGate(
- numberOfInputChannels,
- partitionType,
- enableCreditBasedFlowControl);
+ SingleInputGate inputGate = new SingleInputGateBuilder()
+ .setNumberOfChannels(numberOfInputChannels)
+ .setResultPartitionType(partitionType)
+ .setIsCreditBased(enableCreditBasedFlowControl)
+ .build();
assertEquals(partitionType, inputGate.getConsumedPartitionType());
return inputGate;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 5e99995..8d50a39 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -41,7 +40,6 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.apache.flink.util.function.CheckedSupplier;
@@ -452,16 +450,10 @@ public class LocalInputChannelTest {
checkArgument(numberOfInputChannels >= 1);
checkArgument(numberOfExpectedBuffersPerChannel >= 1);
- this.inputGate = new SingleInputGate(
- "Test Name",
- new JobID(),
- new IntermediateDataSetID(),
- ResultPartitionType.PIPELINED,
- subpartitionIndex,
- numberOfInputChannels,
- new NoOpTaskActions(),
- new SimpleCounter(),
- true);
+ this.inputGate = new SingleInputGateBuilder()
+ .setConsumedSubpartitionIndex(subpartitionIndex)
+ .setNumberOfChannels(numberOfInputChannels)
+ .build();
// Set buffer pool
inputGate.setBufferPool(bufferPool);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
new file mode 100644
index 0000000..28394a3
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
+import org.apache.flink.runtime.taskmanager.TaskActions;
+
+/**
+ * Utility class to encapsulate the logic of building a {@link SingleInputGate} instance.
+ */
+public class SingleInputGateBuilder {
+
+ private final JobID jobId = new JobID();
+
+ private final IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
+
+ private ResultPartitionType partitionType = ResultPartitionType.PIPELINED;
+
+ private int consumedSubpartitionIndex = 0;
+
+ private int numberOfChannels = 1;
+
+ private final TaskActions taskActions = new NoOpTaskActions();
+
+ private final Counter numBytesInCounter = new SimpleCounter();
+
+ private boolean isCreditBased = true;
+
+ public SingleInputGateBuilder setResultPartitionType(ResultPartitionType partitionType) {
+ this.partitionType = partitionType;
+ return this;
+ }
+
+ SingleInputGateBuilder setConsumedSubpartitionIndex(int consumedSubpartitionIndex) {
+ this.consumedSubpartitionIndex = consumedSubpartitionIndex;
+ return this;
+ }
+
+ public SingleInputGateBuilder setNumberOfChannels(int numberOfChannels) {
+ this.numberOfChannels = numberOfChannels;
+ return this;
+ }
+
+ public SingleInputGateBuilder setIsCreditBased(boolean isCreditBased) {
+ this.isCreditBased = isCreditBased;
+ return this;
+ }
+
+ public SingleInputGate build() {
+ return new SingleInputGate(
+ "Single Input Gate",
+ jobId,
+ intermediateDataSetID,
+ partitionType,
+ consumedSubpartitionIndex,
+ numberOfChannels,
+ taskActions,
+ numBytesInCounter,
+ isCreditBased);
+ }
+}