You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/23 16:18:43 UTC

[flink] 03/08: [hotfix][network, tests] Add new unit test for LocalInputChannel#requestSubpartition

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 22f87ee9761b06eeccf8c5adcbd9f4aa96803302
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu May 16 16:53:51 2019 +0800

    [hotfix][network,tests] Add new unit test for LocalInputChannel#requestSubpartition
    
    It is necessary for flip1 to make sure the PartitionNotFoundException would be thrown by LocalInputChannel#requestSubpartition if the partition
    was not registered in ResultPartitionManager before. So a new unit test is added to cover this case.
---
 .../partition/consumer/SingleInputGate.java        |  5 ++
 .../partition/consumer/LocalInputChannelTest.java  | 72 ++++++++++++++++++++++
 2 files changed, 77 insertions(+)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 6c23698..63504bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -410,6 +410,11 @@ public class SingleInputGate extends InputGate {
 		}
 	}
 
+	@VisibleForTesting
+	Timer getRetriggerLocalRequestTimer() {
+		return retriggerLocalRequestTimer;
+	}
+
 	@Override
 	public void close() throws IOException {
 		boolean released = false;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 505f792..a3bc696 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.util.function.CheckedSupplier;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
+import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -60,6 +61,8 @@ import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtil
 import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
@@ -247,6 +250,75 @@ public class LocalInputChannelTest {
 	}
 
 	/**
+	 * Tests that {@link LocalInputChannel#requestSubpartition(int)} throws {@link PartitionNotFoundException}
+	 * if the result partition was not registered in {@link ResultPartitionManager} and no backoff.
+	 */
+	@Test
+	public void testPartitionNotFoundExceptionWhileRequestingPartition() throws Exception {
+		final SingleInputGate inputGate = createSingleInputGate(1);
+		final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
+
+		try {
+			localChannel.requestSubpartition(0);
+
+			fail("Should throw a PartitionNotFoundException.");
+		} catch (PartitionNotFoundException notFound) {
+			assertThat(localChannel.getPartitionId(), Matchers.is(notFound.getPartitionId()));
+		}
+	}
+
+	/**
+	 * Tests that {@link SingleInputGate#retriggerPartitionRequest(IntermediateResultPartitionID)} is triggered
+	 * after {@link LocalInputChannel#requestSubpartition(int)} throws {@link PartitionNotFoundException}
+	 * within backoff.
+	 */
+	@Test
+	public void testRetriggerPartitionRequestWhilePartitionNotFound() throws Exception {
+		final SingleInputGate inputGate = createSingleInputGate(1);
+		final LocalInputChannel localChannel = createLocalInputChannel(
+			inputGate, new ResultPartitionManager(), 1, 1);
+
+		inputGate.setInputChannel(localChannel.getPartitionId().getPartitionId(), localChannel);
+		localChannel.requestSubpartition(0);
+
+		// The timer should be initialized at the first time of retriggering partition request.
+		assertNotNull(inputGate.getRetriggerLocalRequestTimer());
+	}
+
+	/**
+	 * Tests that {@link LocalInputChannel#retriggerSubpartitionRequest(Timer, int)} would throw
+	 * {@link PartitionNotFoundException} which is set onto the input channel then.
+	 */
+	@Test
+	public void testChannelErrorWhileRetriggeringRequest() {
+		final SingleInputGate inputGate = createSingleInputGate(1);
+		final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
+
+		final Timer timer = new Timer(true) {
+			@Override
+			public void schedule(TimerTask task, long delay) {
+				task.run();
+
+				try {
+					localChannel.checkError();
+
+					fail("Should throw a PartitionNotFoundException.");
+				} catch (PartitionNotFoundException notFound) {
+					assertThat(localChannel.partitionId, Matchers.is(notFound.getPartitionId()));
+				} catch (IOException ex) {
+					fail("Should throw a PartitionNotFoundException.");
+				}
+			}
+		};
+
+		try {
+			localChannel.retriggerSubpartitionRequest(timer, 0);
+		} finally {
+			timer.cancel();
+		}
+	}
+
+	/**
 	 * Verifies that concurrent release via the SingleInputGate and re-triggering
 	 * of a partition request works smoothly.
 	 *