You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by zh...@apache.org on 2020/03/02 03:52:02 UTC

[flink] branch master updated (50dcae2 -> d4ca6ea)

This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 50dcae2  [FLINK-16033][table-api] Added test for expression resolution.
     new 569468f  [FLINK-16285][network] Refactor SingleInputGate#setInputChannel to remove IntermediateResultPartitionID argument
     new d4ca6ea  [hotfix][tests] Remove the dead codes of StreamTestSingleInputGate and TestInputChannel

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../partition/consumer/SingleInputGate.java        |  6 ++++--
 .../partition/consumer/SingleInputGateFactory.java |  3 +--
 .../partition/consumer/InputChannelBuilder.java    |  6 +++---
 .../IteratorWrappingTestSingleInputGate.java       |  3 +--
 .../partition/consumer/LocalInputChannelTest.java  |  2 +-
 .../partition/consumer/SingleInputGateTest.java    | 22 ++++++++--------------
 .../partition/consumer/TestInputChannel.java       | 15 +--------------
 .../partition/consumer/TestSingleInputGate.java    |  4 +---
 .../partition/consumer/UnionInputGateTest.java     | 10 ++++------
 .../consumer/StreamTestSingleInputGate.java        | 12 +-----------
 10 files changed, 25 insertions(+), 58 deletions(-)


[flink] 01/02: [FLINK-16285][network] Refactor SingleInputGate#setInputChannel to remove IntermediateResultPartitionID argument

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 569468f729e118ab93547d84327ec846cc2646c8
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Wed Feb 26 13:20:59 2020 +0800

    [FLINK-16285][network] Refactor SingleInputGate#setInputChannel to remove IntermediateResultPartitionID argument
    
    The IntermediateResultPartitionID info can be got directly from the respective InputChannel, so we can remove it from
    the arguments of SingleInputGate#setInputChannel to cleanup the codes.
    
    It is also helpful to simplify the unit tests and avoid passing the inconsistent IntermediateResultPartitionID with the
    internal ResultPartitionID that the respective InputChannel maintains.
---
 .../partition/consumer/SingleInputGate.java        |  6 ++++--
 .../partition/consumer/SingleInputGateFactory.java |  3 +--
 .../partition/consumer/InputChannelBuilder.java    |  6 +++---
 .../IteratorWrappingTestSingleInputGate.java       |  3 +--
 .../partition/consumer/LocalInputChannelTest.java  |  2 +-
 .../partition/consumer/SingleInputGateTest.java    | 22 ++++++++--------------
 .../partition/consumer/TestInputChannel.java       |  3 +--
 .../partition/consumer/TestSingleInputGate.java    |  4 +---
 .../partition/consumer/UnionInputGateTest.java     | 10 ++++------
 .../consumer/StreamTestSingleInputGate.java        |  4 +---
 10 files changed, 25 insertions(+), 38 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index f03e347..677b320 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -318,9 +318,11 @@ public class SingleInputGate extends InputGate {
 		}
 	}
 
-	public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
+	public void setInputChannel(InputChannel inputChannel) {
+		checkNotNull(inputChannel);
+		IntermediateResultPartitionID partitionId = inputChannel.getPartitionId().getPartitionId();
 		synchronized (requestLock) {
-			if (inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)) == null
+			if (inputChannels.put(partitionId, inputChannel) == null
 					&& inputChannel instanceof UnknownInputChannel) {
 
 				numberOfUninitializedChannels++;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index dca505d..a6d3641 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -156,8 +156,7 @@ public class SingleInputGateFactory {
 				shuffleDescriptors[i],
 				channelStatistics,
 				metrics);
-			ResultPartitionID resultPartitionID = inputChannels[i].getPartitionId();
-			inputGate.setInputChannel(resultPartitionID.getPartitionId(), inputChannels[i]);
+			inputGate.setInputChannel(inputChannels[i]);
 		}
 
 		LOG.debug("{}: Created {} input channels ({}).",
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
index d8e9cc2..5709ffc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannelBuilder.java
@@ -120,7 +120,7 @@ public class InputChannelBuilder {
 			maxBackoff,
 			metrics,
 			memorySegmentProvider);
-		inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+		inputGate.setInputChannel(channel);
 		return channel;
 	}
 
@@ -134,7 +134,7 @@ public class InputChannelBuilder {
 			initialBackoff,
 			maxBackoff,
 			metrics);
-		inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+		inputGate.setInputChannel(channel);
 		return channel;
 	}
 
@@ -149,7 +149,7 @@ public class InputChannelBuilder {
 			maxBackoff,
 			metrics,
 			memorySegmentProvider);
-		inputGate.setInputChannel(partitionId.getPartitionId(), channel);
+		inputGate.setInputChannel(channel);
 		return channel;
 	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
index 4cbc6c8..7de4034 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/IteratorWrappingTestSingleInputGate.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -96,7 +95,7 @@ public class IteratorWrappingTestSingleInputGate<T extends IOReadableWritable> e
 
 		inputChannel.addBufferAndAvailability(answer);
 
-		inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
+		inputGate.setInputChannel(inputChannel);
 
 		return this;
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 44d3e23..9eca591 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -286,7 +286,7 @@ public class LocalInputChannelTest {
 		final LocalInputChannel localChannel = createLocalInputChannel(
 			inputGate, new ResultPartitionManager(), 1, 1);
 
-		inputGate.setInputChannel(localChannel.getPartitionId().getPartitionId(), localChannel);
+		inputGate.setInputChannel(localChannel);
 		localChannel.requestSubpartition(0);
 
 		// The timer should be initialized at the first time of retriggering partition request.
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 3de27eb..5606f9e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -96,11 +96,8 @@ public class SingleInputGateTest extends InputGateTestBase {
 			new TestInputChannel(inputGate, 1)
 		};
 
-		inputGate.setInputChannel(
-			new IntermediateResultPartitionID(), inputChannels[0]);
-
-		inputGate.setInputChannel(
-			new IntermediateResultPartitionID(), inputChannels[1]);
+		inputGate.setInputChannel(inputChannels[0]);
+		inputGate.setInputChannel(inputChannels[1]);
 
 		// Test
 		inputChannels[0].readBuffer();
@@ -149,7 +146,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 			assertTrue(compressedBuffer.isCompressed());
 
 			inputChannel.read(compressedBuffer);
-			inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
+			inputGate.setInputChannel(inputChannel);
 			inputGate.notifyChannelNonEmpty(inputChannel);
 
 			Optional<BufferOrEvent> bufferOrEvent = inputGate.getNext();
@@ -166,7 +163,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 	public void testIsAvailable() throws Exception {
 		final SingleInputGate inputGate = createInputGate(1);
 		TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
-		inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
+		inputGate.setInputChannel(inputChannel);
 
 		testIsAvailable(inputGate, inputGate, inputChannel);
 	}
@@ -175,7 +172,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 	public void testIsAvailableAfterFinished() throws Exception {
 		final SingleInputGate inputGate = createInputGate(1);
 		TestInputChannel inputChannel = new TestInputChannel(inputGate, 0);
-		inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannel);
+		inputGate.setInputChannel(inputChannel);
 
 		testIsAvailableAfterFinished(
 			inputGate,
@@ -195,11 +192,8 @@ public class SingleInputGateTest extends InputGateTestBase {
 			new TestInputChannel(inputGate, 1)
 		};
 
-		inputGate.setInputChannel(
-			new IntermediateResultPartitionID(), inputChannels[0]);
-
-		inputGate.setInputChannel(
-			new IntermediateResultPartitionID(), inputChannels[1]);
+		inputGate.setInputChannel(inputChannels[0]);
+		inputGate.setInputChannel(inputChannels[1]);
 
 		// Test
 		inputChannels[0].readBuffer();
@@ -627,7 +621,7 @@ public class SingleInputGateTest extends InputGateTestBase {
 		final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
 		final ResultPartitionID partitionId = localChannel.getPartitionId();
 
-		inputGate.setInputChannel(partitionId.getPartitionId(), localChannel);
+		inputGate.setInputChannel(localChannel);
 		localChannel.setError(new PartitionNotFoundException(partitionId));
 		try {
 			inputGate.getNext();
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 111dae8..3e16327 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -113,7 +112,7 @@ public class TestInputChannel extends InputChannel {
 		for (int i = 0; i < numberOfInputChannels; i++) {
 			mocks[i] = new TestInputChannel(inputGate, i);
 
-			inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i]);
+			inputGate.setInputChannel(mocks[i]);
 		}
 
 		return mocks;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index f60cdb9..60d6aa6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
@@ -41,7 +39,7 @@ public class TestSingleInputGate {
 		if (initialize) {
 			for (int i = 0; i < numberOfInputChannels; i++) {
 				inputChannels[i] = new TestInputChannel(inputGate, i);
-				inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i]);
+				inputGate.setInputChannel(inputChannels[i]);
 			}
 		}
 	}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 1892836..16ad4cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-
 import org.junit.Test;
 
 import static org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateTest.verifyBufferOrEvent;
@@ -107,11 +105,11 @@ public class UnionInputGateTest extends InputGateTestBase {
 	public void testIsAvailable() throws Exception {
 		final SingleInputGate inputGate1 = createInputGate(1);
 		TestInputChannel inputChannel1 = new TestInputChannel(inputGate1, 0);
-		inputGate1.setInputChannel(new IntermediateResultPartitionID(), inputChannel1);
+		inputGate1.setInputChannel(inputChannel1);
 
 		final SingleInputGate inputGate2 = createInputGate(1);
 		TestInputChannel inputChannel2 = new TestInputChannel(inputGate2, 0);
-		inputGate2.setInputChannel(new IntermediateResultPartitionID(), inputChannel2);
+		inputGate2.setInputChannel(inputChannel2);
 
 		testIsAvailable(new UnionInputGate(inputGate1, inputGate2), inputGate1, inputChannel1);
 	}
@@ -120,11 +118,11 @@ public class UnionInputGateTest extends InputGateTestBase {
 	public void testIsAvailableAfterFinished() throws Exception {
 		final SingleInputGate inputGate1 = createInputGate(1);
 		TestInputChannel inputChannel1 = new TestInputChannel(inputGate1, 0);
-		inputGate1.setInputChannel(new IntermediateResultPartitionID(), inputChannel1);
+		inputGate1.setInputChannel(inputChannel1);
 
 		final SingleInputGate inputGate2 = createInputGate(1);
 		TestInputChannel inputChannel2 = new TestInputChannel(inputGate2, 0);
-		inputGate2.setInputChannel(new IntermediateResultPartitionID(), inputChannel2);
+		inputGate2.setInputChannel(inputChannel2);
 
 		testIsAvailableAfterFinished(
 			new UnionInputGate(inputGate1, inputGate2),
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index 4ec6b42..c359060 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.io.network.partition.consumer.TestInputChannel.BufferAndAvailabilityProvider;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 
@@ -124,8 +123,7 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 
 			inputChannels[channelIndex].addBufferAndAvailability(answer);
 
-			inputGate.setInputChannel(new IntermediateResultPartitionID(),
-				inputChannels[channelIndex]);
+			inputGate.setInputChannel(inputChannels[channelIndex]);
 		}
 	}
 


[flink] 02/02: [hotfix][tests] Remove the dead codes of StreamTestSingleInputGate and TestInputChannel

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d4ca6ea2b0061c758f49c1edef562348eb96c9d0
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu Feb 27 12:18:12 2020 +0800

    [hotfix][tests] Remove the dead codes of StreamTestSingleInputGate and TestInputChannel
---
 .../io/network/partition/consumer/TestInputChannel.java      | 12 ------------
 .../partition/consumer/StreamTestSingleInputGate.java        |  8 --------
 2 files changed, 20 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
index 3e16327..d6a2976 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestInputChannel.java
@@ -120,7 +120,6 @@ public class TestInputChannel extends InputChannel {
 
 	@Override
 	void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
-
 	}
 
 	@Override
@@ -141,7 +140,6 @@ public class TestInputChannel extends InputChannel {
 
 	@Override
 	void sendTaskEvent(TaskEvent event) throws IOException {
-
 	}
 
 	@Override
@@ -155,26 +153,16 @@ public class TestInputChannel extends InputChannel {
 
 	@Override
 	void releaseAllResources() throws IOException {
-
 	}
 
 	@Override
 	protected void notifyChannelNonEmpty() {
-
-	}
-
-	public void assertReturnedDataBuffersAreRecycled() {
-		assertReturnedBuffersAreRecycled(true, false);
 	}
 
 	public void assertReturnedEventsAreRecycled() {
 		assertReturnedBuffersAreRecycled(false, true);
 	}
 
-	public void assertAllReturnedBuffersAreRecycled() {
-		assertReturnedBuffersAreRecycled(true, true);
-	}
-
 	private void assertReturnedBuffersAreRecycled(boolean assertBuffers, boolean assertEvents) {
 		for (Buffer b : allReturnedBuffers) {
 			if (b.isBuffer() && assertBuffers && !b.isRecycled()) {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index c359060..8f297cf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -157,14 +157,6 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
 	 * Returns true iff all input queues are empty.
 	 */
 	public boolean allQueuesEmpty() {
-//		for (int i = 0; i < numInputChannels; i++) {
-//			synchronized (inputQueues[i]) {
-//				inputQueues[i].add(InputValue.<T>event(new DummyEvent()));
-//				inputQueues[i].notifyAll();
-//				inputGate.onAvailableBuffer(inputChannels[i].getInputChannel());
-//			}
-//		}
-
 		for (int i = 0; i < numInputChannels; i++) {
 			if (inputQueues[i].size() > 0) {
 				return false;