You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/03/02 03:52:03 UTC
[flink] 01/02: [FLINK-16285][network] Refactor
SingleInputGate#setInputChannel to remove IntermediateResultPartitionID
argument
This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 569468f729e118ab93547d84327ec846cc2646c8
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Wed Feb 26 13:20:59 2020 +0800
[FLINK-16285][network] Refactor SingleInputGate#setInputChannel to remove IntermediateResultPartitionID argument
The IntermediateResultPartitionID info can be got directly from the respective InputChannel, so we can remove it from
the arguments of SingleInputGate#setInputChannel to cleanup the codes.
It is also helpful to simplify the unit tests and avoid passing the inconsistent IntermediateResultPartitionID with the
internal ResultPartitionID that the respective InputChannel maintains.
---
.../partition/consumer/SingleInputGate.java | 6 ++++--
.../partition/consumer/SingleInputGateFactory.java | 3 +--
.../partition/consumer/InputChannelBuilder.java | 6 +++---
.../IteratorWrappingTestSingleInputGate.java | 3 +--
.../partition/consumer/LocalInputChannelTest.java | 2 +-
.../partition/consumer/SingleInputGateTest.java | 22 ++++++++--------------
.../partition/consumer/TestInputChannel.java | 3 +--
.../partition/consumer/TestSingleInputGate.java | 4 +---
.../partition/consumer/UnionInputGateTest.java | 10 ++++------
.../consumer/StreamTestSingleInputGate.java | 4 +---
10 files changed, 25 insertions(+), 38 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index f03e347..677b320 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -318,9 +318,11 @@ public class SingleInputGate extends InputGate {
}
}
- public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
+ public void setInputChannel(InputChannel inputChannel) {
+ checkNotNull(inputChannel);
+ IntermediateResultPartitionID partitionId = inputChannel.getPartitionId().getPartitionId();
synchronized (requestLock) {
- if (inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)) == null
+ if (inputChannels.put(partitionId, inputChannel) == null
&& inputChannel instanceof UnknownInputChannel) {
numberOfUninitializedChannels++;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index dca505d..a6d3641 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -156,8 +156,7 @@ public class SingleInputGateFactory {
shuffleDescriptors[i],
channelStatistics,
metrics);
- ResultPartitionID resultPartitionID = inputChannels[i].getPartitionId();
- inputGate.setInputChannel(resultPartitionID.getPartitionId(), inputChannels[i]);
+ inputGate.setInputChannel(inputChannels[i]);
}
LOG.debug("{}: Created {} input channels ({}).",
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
index d8e9cc2..5709ffc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
@@ -120,7 +120,7 @@ public class InputChannelBuilder {
maxBackoff,
metrics,
memorySegmentProvider);
- inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+ inputGate.setInputChannel(channel);
return channel;
}
@@ -134,7 +134,7 @@ public class InputChannelBuilder {
initialBackoff,
maxBackoff,
metrics);
- inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+ inputGate.setInputChannel(channel);
return channel;
}
@@ -149,7 +149,7 @@ public class InputChannelBuilder {
maxBackoff,
metrics,
memorySegmentProvider);
- inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+ inputGate.setInputChannel(channel);
return channel;
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index 4cbc6c8..7de4034 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;
@@ -96,7 +95,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
inputChannel.addBufferAndAvailability(answer);
- inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
+ inputGate.setInputChannel(inputChannel);
return this;
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 44d3e23..9eca591 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -286,7 +286,7 @@ public class LocalInputChannelTest {
final LocalInputChannel localChannel = createLocalInputChannel(
inputGate, new ResultPartitionManager(), 1, 1);
- inputGate.setInputChannel(localChannel.getPartitionId().getPartitionId(), localChannel);
+ inputGate.setInputChannel(localChannel);
localChannel.requestSubpartition(0);
// The timer should be initialized at the first time of retriggering partition request.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 3de27eb..5606f9e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -96,11 +96,8 @@ public class SingleInputGateTest extends InputGateTestBase {
new TestInputChannel(inputGate, 1)
};
- inputGate.setInputChannel(
- new IntermediateResultPartitionID(), inputChannels[0]);
-
- inputGate.setInputChannel(
- new IntermediateResultPartitionID(), inputChannels[1]);
+ inputGate.setInputChannel(inputChannels[0]);
+ inputGate.setInputChannel(inputChannels[1]);
// Test
inputChannels[0].readBuffer();
@@ -149,7 +146,7 @@ public class SingleInputGateTest extends InputGateTestBase {
assertTrue(compressedBuffer.isCompressed());
inputChannel.read(compressedBuffer);
- inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
+ inputGate.setInputChannel(inputChannel);
inputGate.notifyChannelNonEmpty(inputChannel);
Optional<BufferOrEvent> bufferOrEvent = inputGate.getNext();
@@ -166,7 +163,7 @@ public class SingleInputGateTest extends InputGateTestBase {
public void testIsAvailable() throws Exception {
final SingleInputGate inputGate = createInputGate(1);
TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
- inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
+ inputGate.setInputChannel(inputChannel);
testIsAvailable(inputGate, inputGate, inputChannel);
}
@@ -175,7 +172,7 @@ public class SingleInputGateTest extends InputGateTestBase {
public void testIsAvailableAfterFinished() throws Exception {
final SingleInputGate inputGate = createInputGate(1);
TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
- inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
+ inputGate.setInputChannel(inputChannel);
testIsAvailableAfterFinished(
inputGate,
@@ -195,11 +192,8 @@ public class SingleInputGateTest extends InputGateTestBase {
new TestInputChannel(inputGate, 1)
};
- inputGate.setInputChannel(
- new IntermediateResultPartitionID(), inputChannels[0]);
-
- inputGate.setInputChannel(
- new IntermediateResultPartitionID(), inputChannels[1]);
+ inputGate.setInputChannel(inputChannels[0]);
+ inputGate.setInputChannel(inputChannels[1]);
// Test
inputChannels[0].readBuffer();
@@ -627,7 +621,7 @@ public class SingleInputGateTest extends InputGateTestBase {
final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
final ResultPartitionID partitionId = localChannel.getPartitionId();
- inputGate.setInputChannel(partitionId.getPartitionId(), localChannel);
+ inputGate.setInputChannel(localChannel);
localChannel.setError(new PartitionNotFoundException(partitionId));
try {
inputGate.getNext();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 111dae8..3e16327 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import java.io.IOException;
import java.util.ArrayList;
@@ -113,7 +112,7 @@ public class TestInputChannel extends InputChannel {
for (int i = 0; i < numberOfInputChannels; i++) {
mocks[i] = new TestInputChannel(inputGate, i);
- inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i]);
+ inputGate.setInputChannel(mocks[i]);
}
return mocks;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index f60cdb9..60d6aa6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -41,7 +39,7 @@ public class TestSingleInputGate {
if (initialize) {
for (int i = 0; i < numberOfInputChannels; i++) {
inputChannels[i] = new TestInputChannel(inputGate, i);
- inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i]);
+ inputGate.setInputChannel(inputChannels[i]);
}
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 1892836..16ad4cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
import org.junit.Test;
import static org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.verifyBufferOrEvent;
@@ -107,11 +105,11 @@ public class UnionInputGateTest extends InputGateTestBase {
public void testIsAvailable() throws Exception {
final SingleInputGate inputGate1 = createInputGate(1);
TestInputChannel inputChannel1 = new TestInputChannel(inputGate1, 0);
- inputGate1.setInputChannel(new IntermediateResultPartitionID(), inputChannel1);
+ inputGate1.setInputChannel(inputChannel1);
final SingleInputGate inputGate2 = createInputGate(1);
TestInputChannel inputChannel2 = new TestInputChannel(inputGate2, 0);
- inputGate2.setInputChannel(new IntermediateResultPartitionID(), inputChannel2);
+ inputGate2.setInputChannel(inputChannel2);
testIsAvailable(new UnionInputGate(inputGate1, inputGate2), inputGate1, inputChannel1);
}
@@ -120,11 +118,11 @@ public class UnionInputGateTest extends InputGateTestBase {
public void testIsAvailableAfterFinished() throws Exception {
final SingleInputGate inputGate1 = createInputGate(1);
TestInputChannel inputChannel1 = new TestInputChannel(inputGate1, 0);
- inputGate1.setInputChannel(new IntermediateResultPartitionID(), inputChannel1);
+ inputGate1.setInputChannel(inputChannel1);
final SingleInputGate inputGate2 = createInputGate(1);
TestInputChannel inputChannel2 = new TestInputChannel(inputGate2, 0);
- inputGate2.setInputChannel(new IntermediateResultPartitionID(), inputChannel2);
+ inputGate2.setInputChannel(inputChannel2);
testIsAvailableAfterFinished(
new UnionInputGate(inputGate1, inputGate2),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 4ec6b42..c359060 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
@@ -124,8 +123,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
inputChannels[channelIndex].addBufferAndAvailability(answer);
- inputGate.setInputChannel(new IntermediateResultPartitionID(),
- inputChannels[channelIndex]);
+ inputGate.setInputChannel(inputChannels[channelIndex]);
}
}