You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/05/23 16:18:43 UTC
[flink] 03/08: [hotfix][network,
tests] Add new unit test for LocalInputChannel#requestSubpartition
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 22f87ee9761b06eeccf8c5adcbd9f4aa96803302
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Thu May 16 16:53:51 2019 +0800
[hotfix][network,tests] Add new unit test for LocalInputChannel#requestSubpartition
It is necessary for flip1 to make sure the PartitionNotFoundException would be thrown by LocalInputChannel#requestSubpartition if the partition
was not registered in ResultPartitionManager before. So a new unit test is added to cover this case.
---
.../partition/consumer/SingleInputGate.java | 5 ++
.../partition/consumer/LocalInputChannelTest.java | 72 ++++++++++++++++++++++
2 files changed, 77 insertions(+)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 6c23698..63504bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -410,6 +410,11 @@ public class SingleInputGate extends InputGate {
}
}
+ @VisibleForTesting
+ Timer getRetriggerLocalRequestTimer() {
+ return retriggerLocalRequestTimer;
+ }
+
@Override
public void close() throws IOException {
boolean released = false;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 505f792..a3bc696 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -39,6 +39,7 @@ import org.apache.flink.util.function.CheckedSupplier;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+import org.hamcrest.Matchers;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -60,6 +61,8 @@ import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtil
import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createSingleInputGate;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
@@ -247,6 +250,75 @@ public class LocalInputChannelTest {
}
/**
+ * Tests that {@link LocalInputChannel#requestSubpartition(int)} throws {@link PartitionNotFoundException}
+ * if the result partition was not registered in {@link ResultPartitionManager} and no backoff.
+ */
+ @Test
+ public void testPartitionNotFoundExceptionWhileRequestingPartition() throws Exception {
+ final SingleInputGate inputGate = createSingleInputGate(1);
+ final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
+
+ try {
+ localChannel.requestSubpartition(0);
+
+ fail("Should throw a PartitionNotFoundException.");
+ } catch (PartitionNotFoundException notFound) {
+ assertThat(localChannel.getPartitionId(), Matchers.is(notFound.getPartitionId()));
+ }
+ }
+
+ /**
+ * Tests that {@link SingleInputGate#retriggerPartitionRequest(IntermediateResultPartitionID)} is triggered
+ * after {@link LocalInputChannel#requestSubpartition(int)} throws {@link PartitionNotFoundException}
+ * within backoff.
+ */
+ @Test
+ public void testRetriggerPartitionRequestWhilePartitionNotFound() throws Exception {
+ final SingleInputGate inputGate = createSingleInputGate(1);
+ final LocalInputChannel localChannel = createLocalInputChannel(
+ inputGate, new ResultPartitionManager(), 1, 1);
+
+ inputGate.setInputChannel(localChannel.getPartitionId().getPartitionId(), localChannel);
+ localChannel.requestSubpartition(0);
+
+ // The timer should be initialized at the first time of retriggering partition request.
+ assertNotNull(inputGate.getRetriggerLocalRequestTimer());
+ }
+
+ /**
+ * Tests that {@link LocalInputChannel#retriggerSubpartitionRequest(Timer, int)} would throw
+ * {@link PartitionNotFoundException} which is set onto the input channel then.
+ */
+ @Test
+ public void testChannelErrorWhileRetriggeringRequest() {
+ final SingleInputGate inputGate = createSingleInputGate(1);
+ final LocalInputChannel localChannel = createLocalInputChannel(inputGate, new ResultPartitionManager());
+
+ final Timer timer = new Timer(true) {
+ @Override
+ public void schedule(TimerTask task, long delay) {
+ task.run();
+
+ try {
+ localChannel.checkError();
+
+ fail("Should throw a PartitionNotFoundException.");
+ } catch (PartitionNotFoundException notFound) {
+ assertThat(localChannel.partitionId, Matchers.is(notFound.getPartitionId()));
+ } catch (IOException ex) {
+ fail("Should throw a PartitionNotFoundException.");
+ }
+ }
+ };
+
+ try {
+ localChannel.retriggerSubpartitionRequest(timer, 0);
+ } finally {
+ timer.cancel();
+ }
+ }
+
+ /**
* Verifies that concurrent release via the SingleInputGate and re-triggering
* of a partition request works smoothly.
*