You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/05/10 23:34:12 UTC

[flink] 13/14: [hotfix] [tests] Move utility methods into correct test class.

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 422f7b54f0bf66f51e792d24942cb7d951219da0
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Thu Apr 25 17:53:59 2019 +0200

    [hotfix] [tests] Move utility methods into correct test class.
    
    The methods were intended to be generic for tests across partitions and hence placed in the
    SubpartitionTestBase.
    
    However, the SubpartitionTestBase can only really test common contract behavior, like behavior on
    disposal, finishing, and buffer reycling contracts in those cases. Producer / consumer behavior
    is sufficiently different between both implementations that it does not make sense at this point
    to try and share the tests.
---
 .../PipelinedSubpartitionWithReadViewTest.java     | 93 +++++++++++++++++++++-
 .../io/network/partition/SubpartitionTestBase.java | 91 +--------------------
 2 files changed, 91 insertions(+), 93 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
index 6f9920e..af4eb05 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
@@ -18,12 +18,17 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
@@ -31,14 +36,14 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
-import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextBuffer;
-import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextEvent;
-import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNoNextBuffer;
 import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Mockito.mock;
 
@@ -273,4 +278,86 @@ public class PipelinedSubpartitionWithReadViewTest {
 		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
 		assertEquals(4, availablityListener.getNumNotifications());
 	}
+
+	// ------------------------------------------------------------------------
+
+	static void assertNextBuffer(
+			ResultSubpartitionView readView,
+			int expectedReadableBufferSize,
+			boolean expectedIsMoreAvailable,
+			int expectedBuffersInBacklog,
+			boolean expectedNextBufferIsEvent,
+			boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
+		assertNextBufferOrEvent(
+				readView,
+				expectedReadableBufferSize,
+				true,
+				null,
+				expectedIsMoreAvailable,
+				expectedBuffersInBacklog,
+				expectedNextBufferIsEvent,
+				expectedRecycledAfterRecycle);
+	}
+
+	static void assertNextEvent(
+			ResultSubpartitionView readView,
+			int expectedReadableBufferSize,
+			Class<? extends AbstractEvent> expectedEventClass,
+			boolean expectedIsMoreAvailable,
+			int expectedBuffersInBacklog,
+			boolean expectedNextBufferIsEvent,
+			boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
+		assertNextBufferOrEvent(
+				readView,
+				expectedReadableBufferSize,
+				false,
+				expectedEventClass,
+				expectedIsMoreAvailable,
+				expectedBuffersInBacklog,
+				expectedNextBufferIsEvent,
+				expectedRecycledAfterRecycle);
+	}
+
+	private static void assertNextBufferOrEvent(
+			ResultSubpartitionView readView,
+			int expectedReadableBufferSize,
+			boolean expectedIsBuffer,
+			@Nullable Class<? extends AbstractEvent> expectedEventClass,
+			boolean expectedIsMoreAvailable,
+			int expectedBuffersInBacklog,
+			boolean expectedNextBufferIsEvent,
+			boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
+		checkArgument(expectedEventClass == null || !expectedIsBuffer);
+
+		ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer();
+		assertNotNull(bufferAndBacklog);
+		try {
+			assertEquals("buffer size", expectedReadableBufferSize,
+					bufferAndBacklog.buffer().readableBytes());
+			assertEquals("buffer or event", expectedIsBuffer,
+					bufferAndBacklog.buffer().isBuffer());
+			if (expectedEventClass != null) {
+				Assert.assertThat(EventSerializer
+								.fromBuffer(bufferAndBacklog.buffer(), ClassLoader.getSystemClassLoader()),
+						instanceOf(expectedEventClass));
+			}
+			assertEquals("more available", expectedIsMoreAvailable,
+					bufferAndBacklog.isMoreAvailable());
+			assertEquals("more available", expectedIsMoreAvailable, readView.isAvailable());
+			assertEquals("backlog", expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
+			assertEquals("next is event", expectedNextBufferIsEvent,
+					bufferAndBacklog.nextBufferIsEvent());
+			assertEquals("next is event", expectedNextBufferIsEvent,
+					readView.nextBufferIsEvent());
+
+			assertFalse("not recycled", bufferAndBacklog.buffer().isRecycled());
+		} finally {
+			bufferAndBacklog.buffer().recycleBuffer();
+		}
+		assertEquals("recycled", expectedRecycledAfterRecycle, bufferAndBacklog.buffer().isRecycled());
+	}
+
+	static void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
+		assertNull(readView.getNextBuffer());
+	}
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index e4006be..7c083ad 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -18,27 +18,18 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 /**
@@ -135,84 +126,4 @@ public abstract class SubpartitionTestBase extends TestLogger {
 		// Verify that parent release is reflected at partition view
 		assertTrue(view.isReleased());
 	}
-
-	static void assertNextBuffer(
-			ResultSubpartitionView readView,
-			int expectedReadableBufferSize,
-			boolean expectedIsMoreAvailable,
-			int expectedBuffersInBacklog,
-			boolean expectedNextBufferIsEvent,
-			boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
-		assertNextBufferOrEvent(
-			readView,
-			expectedReadableBufferSize,
-			true,
-			null,
-			expectedIsMoreAvailable,
-			expectedBuffersInBacklog,
-			expectedNextBufferIsEvent,
-			expectedRecycledAfterRecycle);
-	}
-
-	static void assertNextEvent(
-			ResultSubpartitionView readView,
-			int expectedReadableBufferSize,
-			Class<? extends AbstractEvent> expectedEventClass,
-			boolean expectedIsMoreAvailable,
-			int expectedBuffersInBacklog,
-			boolean expectedNextBufferIsEvent,
-			boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
-		assertNextBufferOrEvent(
-			readView,
-			expectedReadableBufferSize,
-			false,
-			expectedEventClass,
-			expectedIsMoreAvailable,
-			expectedBuffersInBacklog,
-			expectedNextBufferIsEvent,
-			expectedRecycledAfterRecycle);
-	}
-
-	private static void assertNextBufferOrEvent(
-			ResultSubpartitionView readView,
-			int expectedReadableBufferSize,
-			boolean expectedIsBuffer,
-			@Nullable Class<? extends AbstractEvent> expectedEventClass,
-			boolean expectedIsMoreAvailable,
-			int expectedBuffersInBacklog,
-			boolean expectedNextBufferIsEvent,
-			boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
-		checkArgument(expectedEventClass == null || !expectedIsBuffer);
-
-		ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer();
-		assertNotNull(bufferAndBacklog);
-		try {
-			assertEquals("buffer size", expectedReadableBufferSize,
-				bufferAndBacklog.buffer().readableBytes());
-			assertEquals("buffer or event", expectedIsBuffer,
-				bufferAndBacklog.buffer().isBuffer());
-			if (expectedEventClass != null) {
-				assertThat(EventSerializer
-						.fromBuffer(bufferAndBacklog.buffer(), ClassLoader.getSystemClassLoader()),
-					instanceOf(expectedEventClass));
-			}
-			assertEquals("more available", expectedIsMoreAvailable,
-				bufferAndBacklog.isMoreAvailable());
-			assertEquals("more available", expectedIsMoreAvailable, readView.isAvailable());
-			assertEquals("backlog", expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
-			assertEquals("next is event", expectedNextBufferIsEvent,
-				bufferAndBacklog.nextBufferIsEvent());
-			assertEquals("next is event", expectedNextBufferIsEvent,
-				readView.nextBufferIsEvent());
-
-			assertFalse("not recycled", bufferAndBacklog.buffer().isRecycled());
-		} finally {
-			bufferAndBacklog.buffer().recycleBuffer();
-		}
-		assertEquals("recycled", expectedRecycledAfterRecycle, bufferAndBacklog.buffer().isRecycled());
-	}
-
-	static void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
-		assertNull(readView.getNextBuffer());
-	}
 }