You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2020/08/28 14:48:20 UTC

[flink] 04/09: [refactor][tests] Change ResultPartitionFactoryTest release-on-consumption testing to test behavior not implementation.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 064cfbd32d5bf26e6e25834243ddbac38d2c32ef
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Mon Aug 24 18:37:45 2020 +0200

    [refactor][tests] Change ResultPartitionFactoryTest release-on-consumption testing to test behavior not implementation.
    
    This prepares the removal of certain subclasses that are currently used in the implementation test.
---
 .../partition/ResultPartitionFactoryTest.java      | 28 +++++++++++++++++-----
 1 file changed, 22 insertions(+), 6 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index acb07cb..61af900 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -33,8 +33,10 @@ import org.junit.Test;
 import java.util.Arrays;
 
 import static org.hamcrest.CoreMatchers.instanceOf;
-import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests for the {@link ResultPartitionFactory}.
@@ -72,18 +74,26 @@ public class ResultPartitionFactoryTest extends TestLogger {
 	@Test
 	public void testConsumptionOnReleaseForPipelined() {
 		final ResultPartition resultPartition = createResultPartition(ResultPartitionType.PIPELINED);
-		assertThat(resultPartition, instanceOf(ReleaseOnConsumptionResultPartition.class));
+
+		resultPartition.onConsumedSubpartition(0);
+
+		assertTrue(resultPartition.isReleased());
 	}
 
 	@Test
 	public void testNoConsumptionOnReleaseForBlocking() {
 		final ResultPartition resultPartition = createResultPartition(ResultPartitionType.BLOCKING);
-		assertThat(resultPartition, not(instanceOf(ReleaseOnConsumptionResultPartition.class)));
+
+		resultPartition.onConsumedSubpartition(0);
+
+		assertFalse(resultPartition.isReleased());
 	}
 
 	private static ResultPartition createResultPartition(ResultPartitionType partitionType) {
-		ResultPartitionFactory factory = new ResultPartitionFactory(
-			new ResultPartitionManager(),
+		final ResultPartitionManager manager = new ResultPartitionManager();
+
+		final ResultPartitionFactory factory = new ResultPartitionFactory(
+			manager,
 			fileChannelManager,
 			new NetworkBufferPool(1, SEGMENT_SIZE),
 			BoundedBlockingSubpartitionType.AUTO,
@@ -104,6 +114,12 @@ public class ResultPartitionFactoryTest extends TestLogger {
 			true
 		);
 
-		return factory.create("test", 0, descriptor);
+		// guard our test assumptions
+		assertEquals(1, descriptor.getNumberOfSubpartitions());
+
+		final ResultPartition partition =  factory.create("test", 0, descriptor);
+		manager.registerResultPartition(partition);
+
+		return partition;
 	}
 }