You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/03/09 18:58:20 UTC

[6/6] flink git commit: [FLINK-8755] [FLINK-8786] [network] Add and improve subpartition tests

[FLINK-8755] [FLINK-8786] [network] Add and improve subpartition tests

+ also improve the subpartition tests in general to reduce some duplication

This closes #5581


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1a969f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1a969f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1a969f7

Branch: refs/heads/release-1.5
Commit: d1a969f7ad018ef44f40f974eb49ba004494fcdf
Parents: 835adcc
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Feb 23 12:13:20 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 9 17:01:54 2018 +0100

----------------------------------------------------------------------
 .../partition/SpillableSubpartitionView.java    |   2 +-
 .../partition/PipelinedSubpartitionTest.java    |  11 +-
 .../partition/SpillableSubpartitionTest.java    | 130 ++++++-------------
 .../network/partition/SubpartitionTestBase.java |  78 ++++++++++-
 4 files changed, 121 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 0f51bc8..65790d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -167,7 +167,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 
 				parent.updateStatistics(current);
 				// if we are spilled (but still process a non-spilled nextBuffer), we don't know the
-				// state of nextBufferIsEvent...
+				// state of nextBufferIsEvent or whether more buffers are available
 				if (spilledView == null) {
 					return new BufferAndBacklog(current, isMoreAvailable, newBacklog, nextBufferIsEvent);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index ee678ab..bc66c9d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -135,7 +135,8 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 			bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
 			subpartition.add(bufferBuilder.createBufferConsumer());
 
-			assertNextBuffer(readView, 1024, false, 1);
+			// note that since the buffer builder is not finished, there is still a retained instance!
+			assertNextBuffer(readView, 1024, false, 1, false, false);
 			assertEquals(1, subpartition.getBuffersInBacklog());
 		} finally {
 			readView.releaseAllResources();
@@ -157,7 +158,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 			subpartition.add(createFilledBufferConsumer(1025)); // finished
 			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
 
-			assertNextBuffer(readView, 1025, false, 1);
+			assertNextBuffer(readView, 1025, false, 1, false, true);
 		} finally {
 			subpartition.release();
 		}
@@ -178,8 +179,8 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
 			subpartition.flush();
 
-			assertNextBuffer(readView, 1025, true, 1);
-			assertNextBuffer(readView, 1024, false, 1);
+			assertNextBuffer(readView, 1025, true, 1, false, true);
+			assertNextBuffer(readView, 1024, false, 1, false, false);
 		} finally {
 			subpartition.release();
 		}
@@ -208,7 +209,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 			subpartition.add(createFilledBufferConsumer(1024));
 			assertEquals(2, availablityListener.getNumNotifications());
 
-			assertNextBuffer(readView, 1024, false, 0);
+			assertNextBuffer(readView, 1024, false, 0, false, true);
 		} finally {
 			readView.releaseAllResources();
 			subpartition.release();

http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index e41a85c..840669e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -24,13 +24,13 @@ import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
 
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -52,7 +52,6 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
@@ -190,10 +189,13 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		SpillableSubpartition partition = createSubpartition();
 
 		BufferConsumer bufferConsumer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
+		BufferConsumer eventBufferConsumer =
+			EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1));
+		final int eventSize = eventBufferConsumer.getWrittenBytes();
 
 		partition.add(bufferConsumer.copy());
 		partition.add(bufferConsumer.copy());
-		partition.add(BufferBuilderTestUtils.createEventBufferConsumer(BUFFER_DATA_SIZE));
+		partition.add(eventBufferConsumer);
 		partition.add(bufferConsumer);
 
 		assertEquals(4, partition.getTotalNumberOfBuffers());
@@ -207,13 +209,13 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		// still same statistics
 		assertEquals(4, partition.getTotalNumberOfBuffers());
 		assertEquals(3, partition.getBuffersInBacklog());
-		assertEquals(BUFFER_DATA_SIZE * 4, partition.getTotalNumberOfBytes());
+		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize, partition.getTotalNumberOfBytes());
 
 		partition.finish();
 		// + one EndOfPartitionEvent
 		assertEquals(5, partition.getTotalNumberOfBuffers());
 		assertEquals(3, partition.getBuffersInBacklog());
-		assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes());
+		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes());
 
 		AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
 		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
@@ -221,59 +223,24 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(1, listener.getNumNotifications());
 
 		assertFalse(reader.nextBufferIsEvent()); // buffer
-		BufferAndBacklog read = reader.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
+		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
 		assertEquals(2, partition.getBuffersInBacklog());
-		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.buffer().isRecycled());
-		read.buffer().recycleBuffer();
-		assertTrue(read.buffer().isRecycled());
-		assertFalse(read.nextBufferIsEvent());
 
 		assertFalse(reader.nextBufferIsEvent()); // buffer
-		read = reader.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
+		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
 		assertEquals(1, partition.getBuffersInBacklog());
-		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.buffer().isRecycled());
-		read.buffer().recycleBuffer();
-		assertTrue(read.buffer().isRecycled());
-		assertTrue(read.nextBufferIsEvent());
 
 		assertTrue(reader.nextBufferIsEvent()); // event
-		read = reader.getNextBuffer();
-		assertNotNull(read);
-		assertFalse(read.buffer().isBuffer());
+		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
 		assertEquals(1, partition.getBuffersInBacklog());
-		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
-		read.buffer().recycleBuffer();
-		assertFalse(read.nextBufferIsEvent());
 
 		assertFalse(reader.nextBufferIsEvent()); // buffer
-		read = reader.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
+		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
 		assertEquals(0, partition.getBuffersInBacklog());
-		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.buffer().isRecycled());
-		read.buffer().recycleBuffer();
-		assertTrue(read.buffer().isRecycled());
-		assertTrue(read.nextBufferIsEvent());
 
 		assertTrue(reader.nextBufferIsEvent()); // end of partition event
-		read = reader.getNextBuffer();
-		assertNotNull(read);
-		assertFalse(read.buffer().isBuffer());
+		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
 		assertEquals(0, partition.getBuffersInBacklog());
-		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertEquals(EndOfPartitionEvent.class,
-			EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass());
-		assertFalse(read.buffer().isRecycled());
-		read.buffer().recycleBuffer();
-		assertTrue(read.buffer().isRecycled());
-		assertFalse(read.nextBufferIsEvent());
 
 		// finally check that the bufferConsumer has been freed after a successful (or failed) write
 		final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
@@ -292,10 +259,13 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		SpillableSubpartition partition = createSubpartition();
 
 		BufferConsumer bufferConsumer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
+		BufferConsumer eventBufferConsumer =
+			EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1));
+		final int eventSize = eventBufferConsumer.getWrittenBytes();
 
 		partition.add(bufferConsumer.copy());
 		partition.add(bufferConsumer.copy());
-		partition.add(BufferBuilderTestUtils.createEventBufferConsumer(BUFFER_DATA_SIZE));
+		partition.add(eventBufferConsumer);
 		partition.add(bufferConsumer);
 		partition.finish();
 
@@ -311,17 +281,12 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertFalse(bufferConsumer.isRecycled());
 
 		assertFalse(reader.nextBufferIsEvent());
-		BufferAndBacklog read = reader.getNextBuffer(); // first buffer (non-spilled)
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
+		// first buffer (non-spilled)
+		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, false);
 		assertEquals(BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); // only updated when getting/spilling the buffers
 		assertEquals(2, partition.getBuffersInBacklog());
-		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
-		read.buffer().recycleBuffer();
-		assertTrue(read.isMoreAvailable());
 		assertEquals(1, listener.getNumNotifications()); // since isMoreAvailable is set to true, no need for notification
 		assertFalse(bufferConsumer.isRecycled());
-		assertFalse(read.nextBufferIsEvent());
 
 		// Spill now
 		assertEquals(3, partition.releaseMemory());
@@ -330,59 +295,44 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 		assertEquals(5, partition.getTotalNumberOfBuffers());
 		assertEquals(2, partition.getBuffersInBacklog());
 		// only updated when getting/spilling the buffers but without the nextBuffer (kept in memory)
-		assertEquals(BUFFER_DATA_SIZE * 3 + 4, partition.getTotalNumberOfBytes());
+		assertEquals(BUFFER_DATA_SIZE * 2 + eventSize + 4, partition.getTotalNumberOfBytes());
 
+		// wait for successfully spilling all buffers (before that we may not access any spilled buffer and cannot rely on isMoreAvailable!)
 		listener.awaitNotifications(2, 30_000);
 		// Spiller finished
 		assertEquals(2, listener.getNumNotifications());
 
+		// after consuming and releasing the next buffer, the bufferConsumer may be freed,
+		// depending on the timing of the last write operation
+		// -> retain once so that we can check below
+		Buffer buffer = bufferConsumer.build();
+		buffer.retainBuffer();
+
 		assertFalse(reader.nextBufferIsEvent()); // second buffer (retained in SpillableSubpartition#nextBuffer)
-		read = reader.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
-		assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics
+		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, false);
+		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics
 		assertEquals(1, partition.getBuffersInBacklog());
-		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
-		read.buffer().recycleBuffer();
-		// now the bufferConsumer may be freed, depending on the timing of the write operation
-		// -> let's do this check at the end of the test (to save some time)
-		assertTrue(read.nextBufferIsEvent());
+
+		bufferConsumer.close(); // recycle the retained buffer from above (should be the last reference!)
 
 		assertTrue(reader.nextBufferIsEvent()); // the event (spilled)
-		read = reader.getNextBuffer();
-		assertNotNull(read);
-		assertFalse(read.buffer().isBuffer());
-		assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
+		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
+		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
 		assertEquals(1, partition.getBuffersInBacklog());
-		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
-		read.buffer().recycleBuffer();
-		assertFalse(read.nextBufferIsEvent());
 
 		assertFalse(reader.nextBufferIsEvent()); // last buffer (spilled)
-		read = reader.getNextBuffer();
-		assertNotNull(read);
-		assertTrue(read.buffer().isBuffer());
-		assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
+		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
+		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
 		assertEquals(0, partition.getBuffersInBacklog());
-		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertFalse(read.buffer().isRecycled());
-		read.buffer().recycleBuffer();
-		assertTrue(read.buffer().isRecycled());
-		assertTrue(read.nextBufferIsEvent());
+
+		buffer.recycleBuffer();
+		assertTrue(buffer.isRecycled());
 
 		// End of partition
 		assertTrue(reader.nextBufferIsEvent());
-		read = reader.getNextBuffer();
-		assertNotNull(read);
-		assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
+		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
+		assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
 		assertEquals(0, partition.getBuffersInBacklog());
-		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
-		assertEquals(EndOfPartitionEvent.class,
-			EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass());
-		assertFalse(read.buffer().isRecycled());
-		read.buffer().recycleBuffer();
-		assertTrue(read.buffer().isRecycled());
-		assertFalse(read.nextBufferIsEvent());
 
 		// finally check that the bufferConsumer has been freed after a successful (or failed) write
 		final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs

http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index a3f18f6..8c90215 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -18,19 +18,26 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
@@ -138,11 +145,74 @@ public abstract class SubpartitionTestBase extends TestLogger {
 			ResultSubpartitionView readView,
 			int expectedReadableBufferSize,
 			boolean expectedIsMoreAvailable,
-			int expectedBuffersInBacklog) throws IOException, InterruptedException {
+			int expectedBuffersInBacklog,
+			boolean expectedNextBufferIsEvent,
+			boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
+		assertNextBufferOrEvent(
+			readView,
+			expectedReadableBufferSize,
+			true,
+			null,
+			expectedIsMoreAvailable,
+			expectedBuffersInBacklog,
+			expectedNextBufferIsEvent,
+			expectedRecycledAfterRecycle);
+	}
+
+	static void assertNextEvent(
+			ResultSubpartitionView readView,
+			int expectedReadableBufferSize,
+			Class<? extends AbstractEvent> expectedEventClass,
+			boolean expectedIsMoreAvailable,
+			int expectedBuffersInBacklog,
+			boolean expectedNextBufferIsEvent,
+			boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
+		assertNextBufferOrEvent(
+			readView,
+			expectedReadableBufferSize,
+			false,
+			expectedEventClass,
+			expectedIsMoreAvailable,
+			expectedBuffersInBacklog,
+			expectedNextBufferIsEvent,
+			expectedRecycledAfterRecycle);
+	}
+
+	private static void assertNextBufferOrEvent(
+			ResultSubpartitionView readView,
+			int expectedReadableBufferSize,
+			boolean expectedIsBuffer,
+			@Nullable Class<? extends AbstractEvent> expectedEventClass,
+			boolean expectedIsMoreAvailable,
+			int expectedBuffersInBacklog,
+			boolean expectedNextBufferIsEvent,
+			boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
+		checkArgument(expectedEventClass == null || !expectedIsBuffer);
+
 		ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer();
-		assertEquals(expectedReadableBufferSize, bufferAndBacklog.buffer().readableBytes());
-		assertEquals(expectedIsMoreAvailable, bufferAndBacklog.isMoreAvailable());
-		assertEquals(expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
+		assertNotNull(bufferAndBacklog);
+		try {
+			assertEquals("buffer size", expectedReadableBufferSize,
+				bufferAndBacklog.buffer().readableBytes());
+			assertEquals("buffer or event", expectedIsBuffer,
+				bufferAndBacklog.buffer().isBuffer());
+			if (expectedEventClass != null) {
+				assertThat(EventSerializer
+						.fromBuffer(bufferAndBacklog.buffer(), ClassLoader.getSystemClassLoader()),
+					instanceOf(expectedEventClass));
+			}
+			assertEquals("more available", expectedIsMoreAvailable,
+				bufferAndBacklog.isMoreAvailable());
+			assertEquals("more available", expectedIsMoreAvailable, readView.isAvailable());
+			assertEquals("backlog", expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
+			assertEquals("next is event", expectedNextBufferIsEvent,
+				bufferAndBacklog.nextBufferIsEvent());
+
+			assertFalse("not recycled", bufferAndBacklog.buffer().isRecycled());
+		} finally {
+			bufferAndBacklog.buffer().recycleBuffer();
+		}
+		assertEquals("recycled", expectedRecycledAfterRecycle, bufferAndBacklog.buffer().isRecycled());
 	}
 
 	protected void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {