You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/03/02 03:52:03 UTC

[flink] 01/02: [FLINK-16285][network] Refactor SingleInputGate#setInputChannel to remove IntermediateResultPartitionID argument

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 569468f729e118ab93547d84327ec846cc2646c8
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Wed Feb 26 13:20:59 2020 +0800

    [FLINK-16285][network] Refactor SingleInputGate#setInputChannel to remove IntermediateResultPartitionID argument
    
    The IntermediateResultPartitionID info can be got directly from the respective InputChannel, so we can remove it from
    the arguments of SingleInputGate#setInputChannel to cleanup the codes.
    
    It is also helpful to simplify the unit tests and avoid passing the inconsistent IntermediateResultPartitionID with the
    internal ResultPartitionID that the respective InputChannel maintains.
---
 .../partition/consumer/SingleInputGate.java        |  6 ++++--
 .../partition/consumer/SingleInputGateFactory.java |  3 +--
 .../partition/consumer/InputChannelBuilder.java    |  6 +++---
 .../IteratorWrappingTestSingleInputGate.java       |  3 +--
 .../partition/consumer/LocalInputChannelTest.java  |  2 +-
 .../partition/consumer/SingleInputGateTest.java    | 22 ++++++++--------------
 .../partition/consumer/TestInputChannel.java       |  3 +--
 .../partition/consumer/TestSingleInputGate.java    |  4 +---
 .../partition/consumer/UnionInputGateTest.java     | 10 ++++------
 .../consumer/StreamTestSingleInputGate.java        |  4 +---
 10 files changed, 25 insertions(+), 38 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index f03e347..677b320 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -318,9 +318,11 @@ public class SingleInputGate extends InputGate {
 		}
 	}
 
-	public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
+	public void setInputChannel(InputChannel inputChannel) {
+		checkNotNull(inputChannel);
+		IntermediateResultPartitionID partitionId = inputChannel.getPartitionId().getPartitionId();
 		synchronized (requestLock) {
-			if (inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)) == null
+			if (inputChannels.put(partitionId, inputChannel) == null
 					&& inputChannel instanceof UnknownInputChannel) {
 
 				numberOfUninitializedChannels++;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index dca505d..a6d3641 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -156,8 +156,7 @@ public class SingleInputGateFactory {
 				shuffleDescriptors[i],
 				channelStatistics,
 				metrics);
-			ResultPartitionID resultPartitionID = inputChannels[i].getPartitionId();
-			inputGate.setInputChannel(resultPartitionID.getPartitionId(), inputChannels[i]);
+			inputGate.setInputChannel(inputChannels[i]);
 		}
 
 		LOG.debug("{}: Created {} input channels ({}).",
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
index d8e9cc2..5709ffc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
@@ -120,7 +120,7 @@ public class InputChannelBuilder {
 			maxBackoff,
 			metrics,
 			memorySegmentProvider);
-		inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+		inputGate.setInputChannel(channel);
 		return channel;
 	}
 
@@ -134,7 +134,7 @@ public class InputChannelBuilder {
 			initialBackoff,
 			maxBackoff,
 			metrics);
-		inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+		inputGate.setInputChannel(channel);
 		return channel;
 	}
 
@@ -149,7 +149,7 @@ public class InputChannelBuilder {
 			maxBackoff,
 			metrics,
 			memorySegmentProvider);
-		inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+		inputGate.setInputChannel(channel);
 		return channel;
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index 4cbc6c8..7de4034 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -96,7 +95,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 
 		inputChannel.addBufferAndAvailability(answer);
 
-		inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
+		inputGate.setInputChannel(inputChannel);
 
 		return this;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 44d3e23..9eca591 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -286,7 +286,7 @@ public class LocalInputChannelTest {
 		final LocalInputChannel localChannel = createLocalInputChannel(
 			inputGate, new ResultPartitionManager(), 1, 1);
 
-		inputGate.setInputChannel(localChannel.getPartitionId().getPartitionId(), localChannel);
+		inputGate.setInputChannel(localChannel);
 		localChannel.requestSubpartition(0);
 
 		// The timer should be initialized at the first time of retriggering partition request.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 3de27eb..5606f9e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -96,11 +96,8 @@ public class SingleInputGateTest extends InputGateTestBase {
 			new TestInputChannel(inputGate, 1)
 		};
 
-		inputGate.setInputChannel(
-			new IntermediateResultPartitionID(), inputChannels[0]);
-
-		inputGate.setInputChannel(
-			new IntermediateResultPartitionID(), inputChannels[1]);
+		inputGate.setInputChannel(inputChannels[0]);
+		inputGate.setInputChannel(inputChannels[1]);
 
 		// Test
 		inputChannels[0].readBuffer();
@@ -149,7 +146,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 			assertTrue(compressedBuffer.isCompressed());
 
 			inputChannel.read(compressedBuffer);
-			inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
+			inputGate.setInputChannel(inputChannel);
 			inputGate.notifyChannelNonEmpty(inputChannel);
 
 			Optional<BufferOrEvent> bufferOrEvent = inputGate.getNext();
@@ -166,7 +163,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 	public void testIsAvailable() throws Exception {
 		final SingleInputGate inputGate = createInputGate(1);
 		TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
-		inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
+		inputGate.setInputChannel(inputChannel);
 
 		testIsAvailable(inputGate, inputGate, inputChannel);
 	}
@@ -175,7 +172,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 	public void testIsAvailableAfterFinished() throws Exception {
 		final SingleInputGate inputGate = createInputGate(1);
 		TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
-		inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
+		inputGate.setInputChannel(inputChannel);
 
 		testIsAvailableAfterFinished(
 			inputGate,
@@ -195,11 +192,8 @@ public class SingleInputGateTest extends InputGateTestBase {
 			new TestInputChannel(inputGate, 1)
 		};
 
-		inputGate.setInputChannel(
-			new IntermediateResultPartitionID(), inputChannels[0]);
-
-		inputGate.setInputChannel(
-			new IntermediateResultPartitionID(), inputChannels[1]);
+		inputGate.setInputChannel(inputChannels[0]);
+		inputGate.setInputChannel(inputChannels[1]);
 
 		// Test
 		inputChannels[0].readBuffer();
@@ -627,7 +621,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 		final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
 		final ResultPartitionID partitionId = localChannel.getPartitionId();
 
-		inputGate.setInputChannel(partitionId.getPartitionId(), localChannel);
+		inputGate.setInputChannel(localChannel);
 		localChannel.setError(new PartitionNotFoundException(partitionId));
 		try {
 			inputGate.getNext();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 111dae8..3e16327 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -113,7 +112,7 @@ public class TestInputChannel extends InputChannel {
 		for (int i = 0; i < numberOfInputChannels; i++) {
 			mocks[i] = new TestInputChannel(inputGate, i);
 
-			inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i]);
+			inputGate.setInputChannel(mocks[i]);
 		}
 
 		return mocks;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index f60cdb9..60d6aa6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -41,7 +39,7 @@ public class TestSingleInputGate {
 		if (initialize) {
 			for (int i = 0; i < numberOfInputChannels; i++) {
 				inputChannels[i] = new TestInputChannel(inputGate, i);
-				inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i]);
+				inputGate.setInputChannel(inputChannels[i]);
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 1892836..16ad4cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
 import org.junit.Test;
 
 import static org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.verifyBufferOrEvent;
@@ -107,11 +105,11 @@ public class UnionInputGateTest extends InputGateTestBase {
 	public void testIsAvailable() throws Exception {
 		final SingleInputGate inputGate1 = createInputGate(1);
 		TestInputChannel inputChannel1 = new TestInputChannel(inputGate1, 0);
-		inputGate1.setInputChannel(new IntermediateResultPartitionID(), inputChannel1);
+		inputGate1.setInputChannel(inputChannel1);
 
 		final SingleInputGate inputGate2 = createInputGate(1);
 		TestInputChannel inputChannel2 = new TestInputChannel(inputGate2, 0);
-		inputGate2.setInputChannel(new IntermediateResultPartitionID(), inputChannel2);
+		inputGate2.setInputChannel(inputChannel2);
 
 		testIsAvailable(new UnionInputGate(inputGate1, inputGate2), inputGate1, inputChannel1);
 	}
@@ -120,11 +118,11 @@ public class UnionInputGateTest extends InputGateTestBase {
 	public void testIsAvailableAfterFinished() throws Exception {
 		final SingleInputGate inputGate1 = createInputGate(1);
 		TestInputChannel inputChannel1 = new TestInputChannel(inputGate1, 0);
-		inputGate1.setInputChannel(new IntermediateResultPartitionID(), inputChannel1);
+		inputGate1.setInputChannel(inputChannel1);
 
 		final SingleInputGate inputGate2 = createInputGate(1);
 		TestInputChannel inputChannel2 = new TestInputChannel(inputGate2, 0);
-		inputGate2.setInputChannel(new IntermediateResultPartitionID(), inputChannel2);
+		inputGate2.setInputChannel(inputChannel2);
 
 		testIsAvailableAfterFinished(
 			new UnionInputGate(inputGate1, inputGate2),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 4ec6b42..c359060 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 
@@ -124,8 +123,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 
 			inputChannels[channelIndex].addBufferAndAvailability(answer);
 
-			inputGate.setInputChannel(new IntermediateResultPartitionID(),
-				inputChannels[channelIndex]);
+			inputGate.setInputChannel(inputChannels[channelIndex]);
 		}
 	}