You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/02/20 00:55:27 UTC

[13/19] flink git commit: [FLINK-5277] [tests] Add unit tests for ResultPartition#add() in case of failures

[FLINK-5277] [tests] Add unit tests for ResultPartition#add() in case of failures

This verifies that the given network buffer is recycled as expected and that
no notifiers are called upon failures to add a buffer.

This closes #3309


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1ceb7d82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1ceb7d82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1ceb7d82

Branch: refs/heads/master
Commit: 1ceb7d82eccf4dc77482bddb61a664fd7f226b2b
Parents: 5e32eb5
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Feb 14 17:42:28 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 20 01:01:23 2017 +0100

----------------------------------------------------------------------
 .../network/partition/ResultPartitionTest.java  | 75 ++++++++++++++++++++
 1 file changed, 75 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1ceb7d82/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
index f6562a1..0cd3591 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
@@ -20,13 +20,16 @@ package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
 import org.apache.flink.runtime.taskmanager.TaskActions;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
@@ -70,6 +73,78 @@ public class ResultPartitionTest {
 		}
 	}
 
+	@Test
+	public void testAddOnFinishedPipelinedPartition() throws Exception {
+		testAddOnFinishedPartition(ResultPartitionType.PIPELINED);
+	}
+
+	@Test
+	public void testAddOnFinishedBlockingPartition() throws Exception {
+		testAddOnFinishedPartition(ResultPartitionType.BLOCKING);
+	}
+
+	/**
+	 * Tests {@link ResultPartition#add} on a partition which has already finished.
+	 *
+	 * @param pipelined the result partition type to set up
+	 */
+	protected void testAddOnFinishedPartition(final ResultPartitionType pipelined)
+		throws Exception {
+		Buffer buffer = TestBufferFactory.createBuffer();
+		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+		try {
+			ResultPartition partition = createPartition(notifier, pipelined, true);
+			partition.finish();
+			reset(notifier);
+			// partition.add() should fail
+			partition.add(buffer, 0);
+			Assert.fail("exception expected");
+		} catch (IllegalStateException e) {
+			// expected => ignored
+		} finally {
+			if (!buffer.isRecycled()) {
+				Assert.fail("buffer not recycled");
+				buffer.recycle();
+			}
+			// should not have notified either
+			verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
+		}
+	}
+
+	@Test
+	public void testAddOnReleasedPipelinedPartition() throws Exception {
+		testAddOnReleasedPartition(ResultPartitionType.PIPELINED);
+	}
+
+	@Test
+	public void testAddOnReleasedBlockingPartition() throws Exception {
+		testAddOnReleasedPartition(ResultPartitionType.BLOCKING);
+	}
+
+	/**
+	 * Tests {@link ResultPartition#add} on a partition which has already been released.
+	 *
+	 * @param pipelined the result partition type to set up
+	 */
+	protected void testAddOnReleasedPartition(final ResultPartitionType pipelined)
+		throws Exception {
+		Buffer buffer = TestBufferFactory.createBuffer();
+		ResultPartitionConsumableNotifier notifier = mock(ResultPartitionConsumableNotifier.class);
+		try {
+			ResultPartition partition = createPartition(notifier, pipelined, true);
+			partition.release();
+			// partition.add() silently drops the buffer but recycles it
+			partition.add(buffer, 0);
+		} finally {
+			if (!buffer.isRecycled()) {
+				Assert.fail("buffer not recycled");
+				buffer.recycle();
+			}
+			// should not have notified either
+			verify(notifier, never()).notifyPartitionConsumable(any(JobID.class), any(ResultPartitionID.class), any(TaskActions.class));
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static ResultPartition createPartition(