You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/05/22 10:26:28 UTC
[flink] 03/10: [hotfix][tests][network] Introduce
InputChannelBuilder for creation of InputChannels in tests
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit e6b8423ee34c2c49f0ea6eb0955ced5db57dd18f
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Sat May 11 19:49:30 2019 +0200
[hotfix][tests][network] Introduce InputChannelBuilder for creation of InputChannels in tests
---
.../partition/consumer/RemoteInputChannel.java | 11 --
.../runtime/io/network/NetworkEnvironmentTest.java | 20 +--
.../netty/PartitionRequestClientHandlerTest.java | 22 +---
.../network/partition/InputChannelTestUtils.java | 47 ++-----
.../network/partition/InputGateConcurrentTest.java | 12 +-
.../network/partition/InputGateFairnessTest.java | 10 +-
.../partition/consumer/InputChannelBuilder.java | 145 +++++++++++++++++++++
.../partition/consumer/LocalInputChannelTest.java | 19 +--
.../partition/consumer/RemoteInputChannelTest.java | 68 +++-------
.../partition/consumer/SingleInputGateTest.java | 107 ++++-----------
10 files changed, 228 insertions(+), 233 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 87e3f62..30246c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -108,17 +108,6 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler,
ResultPartitionID partitionId,
ConnectionID connectionId,
ConnectionManager connectionManager,
- InputChannelMetrics metrics) {
-
- this(inputGate, channelIndex, partitionId, connectionId, connectionManager, 0, 0, metrics);
- }
-
- public RemoteInputChannel(
- SingleInputGate inputGate,
- int channelIndex,
- ResultPartitionID partitionId,
- ConnectionID connectionId,
- ConnectionManager connectionManager,
int initialBackOff,
int maxBackoff,
InputChannelMetrics metrics) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
index a610838..81cd8f4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java
@@ -19,11 +19,10 @@
package org.apache.flink.runtime.io.network;
import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateBuilder;
import org.apache.flink.runtime.taskmanager.Task;
@@ -288,7 +287,7 @@ public class NetworkEnvironmentTest {
* @param numberOfChannels
* the number of input channels
*
- * @return input gate with some fake settings
+ * @return input gate with some fake settiFngs
*/
private SingleInputGate createSingleInputGate(ResultPartitionType partitionType, int numberOfChannels) {
return spy(new SingleInputGateBuilder()
@@ -303,15 +302,10 @@ public class NetworkEnvironmentTest {
int channelIndex,
ResultPartition resultPartition,
ConnectionManager connManager) {
- RemoteInputChannel channel = new RemoteInputChannel(
- inputGate,
- channelIndex,
- resultPartition.getPartitionId(),
- mock(ConnectionID.class),
- connManager,
- 0,
- 0,
- InputChannelTestUtils.newUnregisteredInputChannelMetrics());
- inputGate.setInputChannel(resultPartition.getPartitionId().getPartitionId(), channel);
+ InputChannelBuilder.newBuilder()
+ .setChannelIndex(channelIndex)
+ .setPartitionId(resultPartition.getPartitionId())
+ .setConnectionManager(connManager)
+ .buildRemoteAndSetToGate(inputGate);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
index 487ab1a..16a2415 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
@@ -27,9 +27,9 @@ import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
-import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -211,7 +211,7 @@ public class PartitionRequestClientHandlerTest {
* @return The new created remote input channel.
*/
static RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate) throws Exception {
- return createRemoteInputChannel(inputGate, mock(PartitionRequestClient.class));
+ return createRemoteInputChannel(inputGate, null);
}
/**
@@ -243,19 +243,11 @@ public class PartitionRequestClientHandlerTest {
when(connectionManager.createPartitionRequestClient(any(ConnectionID.class)))
.thenReturn(client);
- ResultPartitionID partitionId = new ResultPartitionID();
- RemoteInputChannel inputChannel = new RemoteInputChannel(
- inputGate,
- 0,
- partitionId,
- mock(ConnectionID.class),
- connectionManager,
- initialBackoff,
- maxBackoff,
- InputChannelTestUtils.newUnregisteredInputChannelMetrics());
-
- inputGate.setInputChannel(partitionId.getPartitionId(), inputChannel);
- return inputChannel;
+ return InputChannelBuilder.newBuilder()
+ .setConnectionManager(connectionManager)
+ .setInitialBackoff(initialBackoff)
+ .setMaxBackoff(maxBackoff)
+ .buildRemoteAndSetToGate(inputGate);
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
index 82b4c5b..806c556 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputChannelTestUtils.java
@@ -20,9 +20,9 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
-import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -32,8 +32,6 @@ import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import java.net.InetSocketAddress;
-
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Mockito.mock;
@@ -43,9 +41,6 @@ import static org.mockito.Mockito.when;
* Some utility methods used for testing InputChannels and InputGates.
*/
public class InputChannelTestUtils {
-
- private static final ConnectionID STUB_CONNECTION_ID = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
-
/**
* Creates a result partition manager that ignores all IDs, and simply returns the given
* subpartitions in sequence.
@@ -96,15 +91,10 @@ public class InputChannelTestUtils {
int channelIndex,
ResultPartitionManager partitionManager) {
- return new LocalInputChannel(
- inputGate,
- channelIndex,
- new ResultPartitionID(),
- partitionManager,
- new TaskEventDispatcher(),
- 0,
- 0,
- newUnregisteredInputChannelMetrics());
+ return InputChannelBuilder.newBuilder()
+ .setChannelIndex(channelIndex)
+ .setPartitionManager(partitionManager)
+ .buildLocalAndSetToGate(inputGate);
}
public static LocalInputChannel createLocalInputChannel(
@@ -113,15 +103,11 @@ public class InputChannelTestUtils {
int initialBackoff,
int maxBackoff) {
- return new LocalInputChannel(
- inputGate,
- 0,
- new ResultPartitionID(),
- partitionManager,
- new TaskEventDispatcher(),
- initialBackoff,
- maxBackoff,
- newUnregisteredInputChannelMetrics());
+ return InputChannelBuilder.newBuilder()
+ .setPartitionManager(partitionManager)
+ .setInitialBackoff(initialBackoff)
+ .setMaxBackoff(maxBackoff)
+ .buildLocalAndSetToGate(inputGate);
}
public static RemoteInputChannel createRemoteInputChannel(
@@ -129,15 +115,10 @@ public class InputChannelTestUtils {
int channelIndex,
ConnectionManager connectionManager) {
- return new RemoteInputChannel(
- inputGate,
- channelIndex,
- new ResultPartitionID(),
- STUB_CONNECTION_ID,
- connectionManager,
- 0,
- 0,
- newUnregisteredInputChannelMetrics());
+ return InputChannelBuilder.newBuilder()
+ .setChannelIndex(channelIndex)
+ .setConnectionManager(connectionManager)
+ .buildRemoteAndSetToGate(inputGate);
}
public static InputChannelMetrics newUnregisteredInputChannelMetrics() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index f753f0f..95dc380 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.runtime.io.network.ConnectionManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
-import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -63,9 +62,7 @@ public class InputGateConcurrentTest {
final SingleInputGate gate = createSingleInputGate(numberOfChannels);
for (int i = 0; i < numberOfChannels; i++) {
- LocalInputChannel channel = createLocalInputChannel(gate, i, resultPartitionManager);
- gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
-
+ createLocalInputChannel(gate, i, resultPartitionManager);
partitions[i] = new PipelinedSubpartition(0, resultPartition);
sources[i] = new PipelinedSubpartitionSource(partitions[i]);
}
@@ -92,8 +89,6 @@ public class InputGateConcurrentTest {
for (int i = 0; i < numberOfChannels; i++) {
RemoteInputChannel channel = createRemoteInputChannel(gate, i, connManager);
- gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
-
sources[i] = new RemoteChannelSource(channel);
}
@@ -137,14 +132,11 @@ public class InputGateConcurrentTest {
localPartitions[local++] = psp;
sources[i] = new PipelinedSubpartitionSource(psp);
- LocalInputChannel channel = createLocalInputChannel(gate, i, resultPartitionManager);
- gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
+ createLocalInputChannel(gate, i, resultPartitionManager);
}
else {
//remote channel
RemoteInputChannel channel = createRemoteInputChannel(gate, i, connManager);
- gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
-
sources[i] = new RemoteChannelSource(channel);
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index ada4d7f..6ed5f21 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
-import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
@@ -92,8 +91,7 @@ public class InputGateFairnessTest {
final SingleInputGate gate = createFairnessVerifyingInputGate(numberOfChannels);
for (int i = 0; i < numberOfChannels; i++) {
- LocalInputChannel channel = createLocalInputChannel(gate, i, resultPartitionManager);
- gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
+ createLocalInputChannel(gate, i, resultPartitionManager);
}
// read all the buffers and the EOF event
@@ -138,8 +136,7 @@ public class InputGateFairnessTest {
final SingleInputGate gate = createFairnessVerifyingInputGate(numberOfChannels);
for (int i = 0; i < numberOfChannels; i++) {
- LocalInputChannel channel = createLocalInputChannel(gate, i, resultPartitionManager);
- gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
+ createLocalInputChannel(gate, i, resultPartitionManager);
}
// seed one initial buffer
@@ -192,8 +189,6 @@ public class InputGateFairnessTest {
channel.onBuffer(mockBuffer, p, -1);
}
channel.onBuffer(EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE), buffersPerChannel, -1);
-
- gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
}
// read all the buffers and the EOF event
@@ -234,7 +229,6 @@ public class InputGateFairnessTest {
for (int i = 0; i < numberOfChannels; i++) {
RemoteInputChannel channel = createRemoteInputChannel(gate, i, connManager);
channels[i] = channel;
- gate.setInputChannel(channel.getPartitionId().getPartitionId(), channel);
}
channels[11].onBuffer(mockBuffer, 0, -1);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
new file mode 100644
index 0000000..549dcc5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.io.network.ConnectionID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.LocalConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.TaskEventPublisher;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
+import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Builder for various {@link InputChannel} types.
+ */
+public class InputChannelBuilder {
+ static final ConnectionID STUB_CONNECTION_ID =
+ new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
+
+ private int channelIndex = 0;
+ private ResultPartitionID partitionId = new ResultPartitionID();
+ private ConnectionID connectionID = STUB_CONNECTION_ID;
+ private ResultPartitionManager partitionManager = new ResultPartitionManager();
+ private TaskEventPublisher taskEventPublisher = new TaskEventDispatcher();
+ private ConnectionManager connectionManager = new LocalConnectionManager();
+ private int initialBackoff = 0;
+ private int maxBackoff = 0;
+ private InputChannelMetrics metrics = InputChannelTestUtils.newUnregisteredInputChannelMetrics();
+
+ public static InputChannelBuilder newBuilder() {
+ return new InputChannelBuilder();
+ }
+
+ public InputChannelBuilder setChannelIndex(int channelIndex) {
+ this.channelIndex = channelIndex;
+ return this;
+ }
+
+ public InputChannelBuilder setPartitionId(ResultPartitionID partitionId) {
+ this.partitionId = partitionId;
+ return this;
+ }
+
+ public InputChannelBuilder setPartitionManager(ResultPartitionManager partitionManager) {
+ this.partitionManager = partitionManager;
+ return this;
+ }
+
+ InputChannelBuilder setTaskEventPublisher(TaskEventPublisher taskEventPublisher) {
+ this.taskEventPublisher = taskEventPublisher;
+ return this;
+ }
+
+ public InputChannelBuilder setConnectionManager(ConnectionManager connectionManager) {
+ this.connectionManager = connectionManager;
+ return this;
+ }
+
+ public InputChannelBuilder setInitialBackoff(int initialBackoff) {
+ this.initialBackoff = initialBackoff;
+ return this;
+ }
+
+ public InputChannelBuilder setMaxBackoff(int maxBackoff) {
+ this.maxBackoff = maxBackoff;
+ return this;
+ }
+
+ public InputChannelBuilder setMetrics(InputChannelMetrics metrics) {
+ this.metrics = metrics;
+ return this;
+ }
+
+ InputChannelBuilder setupFromNetworkEnvironment(NetworkEnvironment network) {
+ this.partitionManager = network.getResultPartitionManager();
+ this.connectionManager = network.getConnectionManager();
+ this.initialBackoff = network.getConfiguration().partitionRequestInitialBackoff();
+ this.maxBackoff = network.getConfiguration().partitionRequestMaxBackoff();
+ return this;
+ }
+
+ UnknownInputChannel buildUnknownAndSetToGate(SingleInputGate inputGate) {
+ UnknownInputChannel channel = new UnknownInputChannel(
+ inputGate,
+ channelIndex,
+ partitionId,
+ partitionManager,
+ taskEventPublisher,
+ connectionManager,
+ initialBackoff,
+ maxBackoff,
+ metrics);
+ inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+ return channel;
+ }
+
+ public LocalInputChannel buildLocalAndSetToGate(SingleInputGate inputGate) {
+ LocalInputChannel channel = new LocalInputChannel(
+ inputGate,
+ channelIndex,
+ partitionId,
+ partitionManager,
+ taskEventPublisher,
+ initialBackoff,
+ maxBackoff,
+ metrics);
+ inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+ return channel;
+ }
+
+ public RemoteInputChannel buildRemoteAndSetToGate(SingleInputGate inputGate) {
+ RemoteInputChannel channel = new RemoteInputChannel(
+ inputGate,
+ channelIndex,
+ partitionId,
+ connectionID,
+ connectionManager,
+ initialBackoff,
+ maxBackoff,
+ metrics);
+ inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+ return channel;
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 4654998..0c25b24 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
-import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
@@ -36,7 +35,6 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -297,8 +295,6 @@ public class LocalInputChannelTest {
final LocalInputChannel channel = createLocalInputChannel(gate, partitionManager, 1, 1);
- gate.setInputChannel(new IntermediateResultPartitionID(), channel);
-
Thread releaser = new Thread() {
@Override
public void run() {
@@ -444,15 +440,12 @@ public class LocalInputChannelTest {
// Setup input channels
for (int i = 0; i < numberOfInputChannels; i++) {
- inputGate.setInputChannel(
- new IntermediateResultPartitionID(),
- new LocalInputChannel(
- inputGate,
- i,
- consumedPartitionIds[i],
- partitionManager,
- taskEventDispatcher,
- InputChannelTestUtils.newUnregisteredInputChannelMetrics()));
+ InputChannelBuilder.newBuilder()
+ .setChannelIndex(i)
+ .setPartitionManager(partitionManager)
+ .setPartitionId(consumedPartitionIds[i])
+ .setTaskEventPublisher(taskEventDispatcher)
+ .buildLocalAndSetToGate(inputGate);
}
this.numberOfInputChannels = numberOfInputChannels;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 1e571e7..67bc6f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -47,8 +47,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import scala.Tuple2;
-
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasProperty;
@@ -189,28 +187,24 @@ public class RemoteInputChannelTest {
@Test(expected = IllegalStateException.class)
public void testRetriggerWithoutPartitionRequest() throws Exception {
- Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(500, 3000);
PartitionRequestClient connClient = mock(PartitionRequestClient.class);
SingleInputGate inputGate = mock(SingleInputGate.class);
- RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, backoff);
+ RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, 500, 3000);
ch.retriggerSubpartitionRequest(0);
}
@Test
public void testPartitionRequestExponentialBackoff() throws Exception {
- // Config
- Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(500, 3000);
-
// Start with initial backoff, then keep doubling, and cap at max.
- int[] expectedDelays = {backoff._1(), 1000, 2000, backoff._2()};
+ int[] expectedDelays = {500, 1000, 2000, 3000};
// Setup
PartitionRequestClient connClient = mock(PartitionRequestClient.class);
SingleInputGate inputGate = mock(SingleInputGate.class);
- RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, backoff);
+ RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, 500, 3000);
// Initial request
ch.requestSubpartition(0);
@@ -235,14 +229,11 @@ public class RemoteInputChannelTest {
@Test
public void testPartitionRequestSingleBackoff() throws Exception {
- // Config
- Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(500, 500);
-
// Setup
PartitionRequestClient connClient = mock(PartitionRequestClient.class);
SingleInputGate inputGate = mock(SingleInputGate.class);
- RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, backoff);
+ RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, 500, 500);
// No delay for first request
ch.requestSubpartition(0);
@@ -250,7 +241,7 @@ public class RemoteInputChannelTest {
// Initial delay for second request
ch.retriggerSubpartitionRequest(0);
- verify(connClient).requestSubpartition(eq(ch.partitionId), eq(0), eq(ch), eq(backoff._1()));
+ verify(connClient).requestSubpartition(eq(ch.partitionId), eq(0), eq(ch), eq(500));
// Exception after backoff is greater than the maximum backoff.
try {
@@ -264,14 +255,11 @@ public class RemoteInputChannelTest {
@Test
public void testPartitionRequestNoBackoff() throws Exception {
- // Config
- Tuple2<Integer, Integer> backoff = new Tuple2<Integer, Integer>(0, 0);
-
// Setup
PartitionRequestClient connClient = mock(PartitionRequestClient.class);
SingleInputGate inputGate = mock(SingleInputGate.class);
- RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, backoff);
+ RemoteInputChannel ch = createRemoteInputChannel(inputGate, connClient, 0, 0);
// No delay for first request
ch.requestSubpartition(0);
@@ -297,13 +285,10 @@ public class RemoteInputChannelTest {
final SingleInputGate inputGate = mock(SingleInputGate.class);
- final RemoteInputChannel ch = new RemoteInputChannel(
- inputGate,
- 0,
- partitionId,
- mock(ConnectionID.class),
- connectionManager,
- InputChannelTestUtils.newUnregisteredInputChannelMetrics());
+ final RemoteInputChannel ch = InputChannelBuilder.newBuilder()
+ .setPartitionId(partitionId)
+ .setConnectionManager(connectionManager)
+ .buildRemoteAndSetToGate(inputGate);
ch.onFailedPartitionRequest();
@@ -346,7 +331,6 @@ public class RemoteInputChannelTest {
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
- inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
Throwable thrown = null;
try {
final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
@@ -486,7 +470,6 @@ public class RemoteInputChannelTest {
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
- inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
Throwable thrown = null;
try {
final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
@@ -562,7 +545,6 @@ public class RemoteInputChannelTest {
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
- inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
Throwable thrown = null;
try {
final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
@@ -654,9 +636,6 @@ public class RemoteInputChannelTest {
final RemoteInputChannel channel1 = spy(createRemoteInputChannel(inputGate));
final RemoteInputChannel channel2 = spy(createRemoteInputChannel(inputGate));
final RemoteInputChannel channel3 = spy(createRemoteInputChannel(inputGate));
- inputGate.setInputChannel(channel1.partitionId.getPartitionId(), channel1);
- inputGate.setInputChannel(channel2.partitionId.getPartitionId(), channel2);
- inputGate.setInputChannel(channel3.partitionId.getPartitionId(), channel3);
Throwable thrown = null;
try {
final BufferPool bufferPool = spy(networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers));
@@ -724,14 +703,11 @@ public class RemoteInputChannelTest {
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel successfulRemoteIC = createRemoteInputChannel(inputGate);
- inputGate.setInputChannel(successfulRemoteIC.partitionId.getPartitionId(), successfulRemoteIC);
-
successfulRemoteIC.requestSubpartition(0);
// late creation -> no exclusive buffers, also no requested subpartition in successfulRemoteIC
// (to trigger a failure in RemoteInputChannel#notifyBufferAvailable())
final RemoteInputChannel failingRemoteIC = createRemoteInputChannel(inputGate);
- inputGate.setInputChannel(failingRemoteIC.partitionId.getPartitionId(), failingRemoteIC);
Buffer buffer = null;
Throwable thrown = null;
@@ -789,7 +765,6 @@ public class RemoteInputChannelTest {
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
- inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
Throwable thrown = null;
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
@@ -852,7 +827,6 @@ public class RemoteInputChannelTest {
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
- inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
Throwable thrown = null;
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
@@ -904,7 +878,6 @@ public class RemoteInputChannelTest {
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
- inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
Throwable thrown = null;
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
@@ -959,7 +932,6 @@ public class RemoteInputChannelTest {
final SingleInputGate inputGate = createSingleInputGate(1);
final RemoteInputChannel inputChannel = createRemoteInputChannel(inputGate);
- inputGate.setInputChannel(inputChannel.partitionId.getPartitionId(), inputChannel);
Throwable thrown = null;
try {
final BufferPool bufferPool = networkBufferPool.createBufferPool(numFloatingBuffers, numFloatingBuffers);
@@ -1027,29 +999,25 @@ public class RemoteInputChannelTest {
private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate)
throws IOException, InterruptedException {
- return createRemoteInputChannel(
- inputGate, mock(PartitionRequestClient.class), new Tuple2<Integer, Integer>(0, 0));
+ return createRemoteInputChannel(inputGate, mock(PartitionRequestClient.class), 0, 0);
}
private RemoteInputChannel createRemoteInputChannel(
SingleInputGate inputGate,
PartitionRequestClient partitionRequestClient,
- Tuple2<Integer, Integer> initialAndMaxRequestBackoff)
+ int initialBackoff,
+ int maxBackoff)
throws IOException, InterruptedException {
final ConnectionManager connectionManager = mock(ConnectionManager.class);
when(connectionManager.createPartitionRequestClient(any(ConnectionID.class)))
.thenReturn(partitionRequestClient);
- return new RemoteInputChannel(
- inputGate,
- 0,
- new ResultPartitionID(),
- mock(ConnectionID.class),
- connectionManager,
- initialAndMaxRequestBackoff._1(),
- initialAndMaxRequestBackoff._2(),
- InputChannelTestUtils.newUnregisteredInputChannelMetrics());
+ return InputChannelBuilder.newBuilder()
+ .setConnectionManager(connectionManager)
+ .setInitialBackoff(initialBackoff)
+ .setMaxBackoff(maxBackoff)
+ .buildRemoteAndSetToGate(inputGate);
}
/**
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 07fcd26..ef09d80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -26,9 +26,6 @@ import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.ConnectionManager;
-import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.NetworkEnvironmentBuilder;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -36,7 +33,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -52,7 +48,6 @@ import org.apache.flink.runtime.taskmanager.NoOpTaskActions;
import org.junit.Test;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
@@ -179,21 +174,24 @@ public class SingleInputGateTest extends InputGateTestBase {
inputGate.setBufferPool(bufferPool);
- final InputChannelMetrics metrics = InputChannelTestUtils.newUnregisteredInputChannelMetrics();
-
// Local
ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
- InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, metrics);
+ InputChannelBuilder.newBuilder()
+ .setPartitionId(localPartitionId)
+ .setPartitionManager(partitionManager)
+ .setTaskEventPublisher(taskEventDispatcher)
+ .buildLocalAndSetToGate(inputGate);
// Unknown
ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
- InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, metrics);
-
- // Set channels
- inputGate.setInputChannel(localPartitionId.getPartitionId(), local);
- inputGate.setInputChannel(unknownPartitionId.getPartitionId(), unknown);
+ InputChannelBuilder.newBuilder()
+ .setChannelIndex(1)
+ .setPartitionId(unknownPartitionId)
+ .setPartitionManager(partitionManager)
+ .setTaskEventPublisher(taskEventDispatcher)
+ .buildUnknownAndSetToGate(inputGate);
// Request partitions
inputGate.requestPartitions();
@@ -227,18 +225,9 @@ public class SingleInputGateTest extends InputGateTestBase {
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
- InputChannel unknown = new UnknownInputChannel(
- inputGate,
- 0,
- new ResultPartitionID(),
- partitionManager,
- new TaskEventDispatcher(),
- new LocalConnectionManager(),
- 0,
- 0,
- InputChannelTestUtils.newUnregisteredInputChannelMetrics());
-
- inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
+ InputChannel unknown = InputChannelBuilder.newBuilder()
+ .setPartitionManager(partitionManager)
+ .buildUnknownAndSetToGate(inputGate);
// Update to a local channel and verify that no request is triggered
inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(
@@ -260,18 +249,7 @@ public class SingleInputGateTest extends InputGateTestBase {
// Setup the input gate with a single channel that does nothing
final SingleInputGate inputGate = createInputGate(1);
- InputChannel unknown = new UnknownInputChannel(
- inputGate,
- 0,
- new ResultPartitionID(),
- new ResultPartitionManager(),
- new TaskEventDispatcher(),
- new LocalConnectionManager(),
- 0,
- 0,
- InputChannelTestUtils.newUnregisteredInputChannelMetrics());
-
- inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
+ InputChannelBuilder.newBuilder().buildUnknownAndSetToGate(inputGate);
// Start the consumer in a separate Thread
Thread asyncConsumer = new Thread() {
@@ -335,7 +313,7 @@ public class SingleInputGateTest extends InputGateTestBase {
// Remote
new InputChannelDeploymentDescriptor(
partitionIds[1],
- ResultPartitionLocation.createRemote(new ConnectionID(new InetSocketAddress("localhost", 5000), 0))),
+ ResultPartitionLocation.createRemote(InputChannelBuilder.STUB_CONNECTION_ID)),
// Unknown
new InputChannelDeploymentDescriptor(
partitionIds[2],
@@ -415,16 +393,14 @@ public class SingleInputGateTest extends InputGateTestBase {
final NetworkEnvironment network = createNetworkEnvironment();
try {
- final ResultPartitionID resultPartitionId = new ResultPartitionID();
- final ConnectionID connectionId = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
- addRemoteInputChannel(network, inputGate, connectionId, resultPartitionId, 0);
-
+ RemoteInputChannel remote =
+ InputChannelBuilder.newBuilder()
+ .setupFromNetworkEnvironment(network)
+ .buildRemoteAndSetToGate(inputGate);
network.setupInputGate(inputGate);
NetworkBufferPool bufferPool = network.getNetworkBufferPool();
if (enableCreditBasedFlowControl) {
- RemoteInputChannel remote = (RemoteInputChannel) inputGate.getInputChannels()
- .get(resultPartitionId.getPartitionId());
// only the exclusive buffers should be assigned/available now
assertEquals(buffersPerChannel, remote.getNumberOfAvailableBuffers());
@@ -469,10 +445,9 @@ public class SingleInputGateTest extends InputGateTestBase {
}
// Trigger updates to remote input channel from unknown input channel
- final ConnectionID connectionId = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(
resultPartitionId,
- ResultPartitionLocation.createRemote(connectionId)));
+ ResultPartitionLocation.createRemote(InputChannelBuilder.STUB_CONNECTION_ID)));
if (enableCreditBasedFlowControl) {
RemoteInputChannel remote = (RemoteInputChannel) inputGate.getInputChannels()
@@ -517,10 +492,9 @@ public class SingleInputGateTest extends InputGateTestBase {
is(instanceOf((UnknownInputChannel.class))));
// Trigger updates to remote input channel from unknown input channel
- final ConnectionID remoteConnectionId = new ConnectionID(new InetSocketAddress("localhost", 5000), 0);
inputGate.updateInputChannel(new InputChannelDeploymentDescriptor(
remoteResultPartitionId,
- ResultPartitionLocation.createRemote(remoteConnectionId)));
+ ResultPartitionLocation.createRemote(InputChannelBuilder.STUB_CONNECTION_ID)));
assertThat(inputGate.getInputChannels().get(remoteResultPartitionId.getPartitionId()),
is(instanceOf((RemoteInputChannel.class))));
@@ -549,38 +523,11 @@ public class SingleInputGateTest extends InputGateTestBase {
SingleInputGate inputGate,
ResultPartitionID partitionId,
int channelIndex) {
- UnknownInputChannel unknown =
- createUnknownInputChannel(network, inputGate, partitionId, channelIndex);
- inputGate.setInputChannel(partitionId.getPartitionId(), unknown);
- }
-
- private UnknownInputChannel createUnknownInputChannel(
- NetworkEnvironment network,
- SingleInputGate inputGate,
- ResultPartitionID partitionId,
- int channelIndex) {
- return new UnknownInputChannel(
- inputGate,
- channelIndex,
- partitionId,
- network.getResultPartitionManager(),
- new TaskEventDispatcher(),
- network.getConnectionManager(),
- network.getConfiguration().partitionRequestInitialBackoff(),
- network.getConfiguration().partitionRequestMaxBackoff(),
- InputChannelTestUtils.newUnregisteredInputChannelMetrics());
- }
-
- private void addRemoteInputChannel(
- NetworkEnvironment network,
- SingleInputGate inputGate,
- ConnectionID connectionId,
- ResultPartitionID partitionId,
- int channelIndex) {
- RemoteInputChannel remote =
- createUnknownInputChannel(network, inputGate, partitionId, channelIndex)
- .toRemoteInputChannel(connectionId);
- inputGate.setInputChannel(partitionId.getPartitionId(), remote);
+ InputChannelBuilder.newBuilder()
+ .setChannelIndex(channelIndex)
+ .setPartitionId(partitionId)
+ .setupFromNetworkEnvironment(network)
+ .buildUnknownAndSetToGate(inputGate);
}
private NetworkEnvironment createNetworkEnvironment() {