You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:27 UTC
[13/19] flink git commit: [FLINK-5277] [tests] Add unit tests for
ResultPartition#add() in case of failures
[FLINK-5277] [tests] Add unit tests for ResultPartition#add() in case of failures
This verifies that the given network buffer is recycled as expected and that
no notifiers are called upon failures to add a buffer.
This closes #3309
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ceb7d82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ceb7d82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ceb7d82
Branch: refs/heads/master
Commit: 1ceb7d82eccf4dc77482bddb61a664fd7f226b2b
Parents: 5e32eb5
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Feb 14 17:42:28 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 01:01:23 2017 +0100
----------------------------------------------------------------------
.../network/partition/ResultPartitionTest.java | 75 ++++++++++++++++++++
1 file changed, 75 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1ceb7d82/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index f6562a1..0cd3591 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -20,13 +20,16 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.junit.Assert;
import org.junit.Test;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -70,6 +73,78 @@ public class ResultPartitionTest {
}
}
+ @Test
+ public void testAddOnFinishedPipelinedPartition() throws Exception {
+ testAddOnFinishedPartition(ResultPartitionType.PIPELINED);
+ }
+
+ @Test
+ public void testAddOnFinishedBlockingPartition() throws Exception {
+ testAddOnFinishedPartition(ResultPartitionType.BLOCKING);
+ }
+
+ /**
+ * Tests {@link ResultPartition#add} on a partition which has already finished.
+ *
+ * @param pipelined the result partition type to set up
+ */
+ protected void testAddOnFinishedPartition(final ResultPartitionType pipelined)
+ throws Exception {
+ Buffer buffer = TestBufferFactory.createBuffer();
+ ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+ try {
+ ResultPartition partition = createPartition(notifier, pipelined, true);
+ partition.finish();
+ reset(notifier);
+ // partition.add() should fail
+ partition.add(buffer, 0);
+ Assert.fail("exception expected");
+ } catch (IllegalStateException e) {
+ // expected => ignored
+ } finally {
+ if (!buffer.isRecycled()) {
+ Assert.fail("buffer not recycled");
+ buffer.recycle();
+ }
+ // should not have notified either
+ verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
+ }
+ }
+
+ @Test
+ public void testAddOnReleasedPipelinedPartition() throws Exception {
+ testAddOnReleasedPartition(ResultPartitionType.PIPELINED);
+ }
+
+ @Test
+ public void testAddOnReleasedBlockingPartition() throws Exception {
+ testAddOnReleasedPartition(ResultPartitionType.BLOCKING);
+ }
+
+ /**
+ * Tests {@link ResultPartition#add} on a partition which has already been released.
+ *
+ * @param pipelined the result partition type to set up
+ */
+ protected void testAddOnReleasedPartition(final ResultPartitionType pipelined)
+ throws Exception {
+ Buffer buffer = TestBufferFactory.createBuffer();
+ ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+ try {
+ ResultPartition partition = createPartition(notifier, pipelined, true);
+ partition.release();
+ // partition.add() silently drops the buffer but recycles it
+ partition.add(buffer, 0);
+ } finally {
+ if (!buffer.isRecycled()) {
+ Assert.fail("buffer not recycled");
+ buffer.recycle();
+ }
+ // should not have notified either
+ verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
+ }
+ }
+
// ------------------------------------------------------------------------
private static ResultPartition createPartition(