You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2018/03/09 18:58:20 UTC
[6/6] flink git commit: [FLINK-8755] [FLINK-8786] [network] Add and
improve subpartition tests
[FLINK-8755] [FLINK-8786] [network] Add and improve subpartition tests
+ also improve the subpartition tests in general to reduce some duplication
This closes #5581
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1a969f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1a969f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1a969f7
Branch: refs/heads/release-1.5
Commit: d1a969f7ad018ef44f40f974eb49ba004494fcdf
Parents: 835adcc
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Feb 23 12:13:20 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 9 17:01:54 2018 +0100
----------------------------------------------------------------------
.../partition/SpillableSubpartitionView.java | 2 +-
.../partition/PipelinedSubpartitionTest.java | 11 +-
.../partition/SpillableSubpartitionTest.java | 130 ++++++-------------
.../network/partition/SubpartitionTestBase.java | 78 ++++++++++-
4 files changed, 121 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 0f51bc8..65790d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -167,7 +167,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
parent.updateStatistics(current);
// if we are spilled (but still process a non-spilled nextBuffer), we don't know the
- // state of nextBufferIsEvent...
+ // state of nextBufferIsEvent or whether more buffers are available
if (spilledView == null) {
return new BufferAndBacklog(current, isMoreAvailable, newBacklog, nextBufferIsEvent);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index ee678ab..bc66c9d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -135,7 +135,8 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
subpartition.add(bufferBuilder.createBufferConsumer());
- assertNextBuffer(readView, 1024, false, 1);
+ // note that since the buffer builder is not finished, there is still a retained instance!
+ assertNextBuffer(readView, 1024, false, 1, false, false);
assertEquals(1, subpartition.getBuffersInBacklog());
} finally {
readView.releaseAllResources();
@@ -157,7 +158,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
subpartition.add(createFilledBufferConsumer(1025)); // finished
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
- assertNextBuffer(readView, 1025, false, 1);
+ assertNextBuffer(readView, 1025, false, 1, false, true);
} finally {
subpartition.release();
}
@@ -178,8 +179,8 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
subpartition.flush();
- assertNextBuffer(readView, 1025, true, 1);
- assertNextBuffer(readView, 1024, false, 1);
+ assertNextBuffer(readView, 1025, true, 1, false, true);
+ assertNextBuffer(readView, 1024, false, 1, false, false);
} finally {
subpartition.release();
}
@@ -208,7 +209,7 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
subpartition.add(createFilledBufferConsumer(1024));
assertEquals(2, availablityListener.getNumNotifications());
- assertNextBuffer(readView, 1024, false, 0);
+ assertNextBuffer(readView, 1024, false, 0, false, true);
} finally {
readView.releaseAllResources();
subpartition.release();
http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index e41a85c..840669e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -24,13 +24,13 @@ import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsyncWithNoOpBufferFileWriter;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
-import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.partition.ResultSubpartition.BufferAndBacklog;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -52,7 +52,6 @@ import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
@@ -190,10 +189,13 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
SpillableSubpartition partition = createSubpartition();
BufferConsumer bufferConsumer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
+ BufferConsumer eventBufferConsumer =
+ EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1));
+ final int eventSize = eventBufferConsumer.getWrittenBytes();
partition.add(bufferConsumer.copy());
partition.add(bufferConsumer.copy());
- partition.add(BufferBuilderTestUtils.createEventBufferConsumer(BUFFER_DATA_SIZE));
+ partition.add(eventBufferConsumer);
partition.add(bufferConsumer);
assertEquals(4, partition.getTotalNumberOfBuffers());
@@ -207,13 +209,13 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
// still same statistics
assertEquals(4, partition.getTotalNumberOfBuffers());
assertEquals(3, partition.getBuffersInBacklog());
- assertEquals(BUFFER_DATA_SIZE * 4, partition.getTotalNumberOfBytes());
+ assertEquals(BUFFER_DATA_SIZE * 3 + eventSize, partition.getTotalNumberOfBytes());
partition.finish();
// + one EndOfPartitionEvent
assertEquals(5, partition.getTotalNumberOfBuffers());
assertEquals(3, partition.getBuffersInBacklog());
- assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes());
+ assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes());
AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
@@ -221,59 +223,24 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertEquals(1, listener.getNumNotifications());
assertFalse(reader.nextBufferIsEvent()); // buffer
- BufferAndBacklog read = reader.getNextBuffer();
- assertNotNull(read);
- assertTrue(read.buffer().isBuffer());
+ assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
assertEquals(2, partition.getBuffersInBacklog());
- assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
- assertFalse(read.buffer().isRecycled());
- read.buffer().recycleBuffer();
- assertTrue(read.buffer().isRecycled());
- assertFalse(read.nextBufferIsEvent());
assertFalse(reader.nextBufferIsEvent()); // buffer
- read = reader.getNextBuffer();
- assertNotNull(read);
- assertTrue(read.buffer().isBuffer());
+ assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
assertEquals(1, partition.getBuffersInBacklog());
- assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
- assertFalse(read.buffer().isRecycled());
- read.buffer().recycleBuffer();
- assertTrue(read.buffer().isRecycled());
- assertTrue(read.nextBufferIsEvent());
assertTrue(reader.nextBufferIsEvent()); // event
- read = reader.getNextBuffer();
- assertNotNull(read);
- assertFalse(read.buffer().isBuffer());
+ assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
assertEquals(1, partition.getBuffersInBacklog());
- assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
- read.buffer().recycleBuffer();
- assertFalse(read.nextBufferIsEvent());
assertFalse(reader.nextBufferIsEvent()); // buffer
- read = reader.getNextBuffer();
- assertNotNull(read);
- assertTrue(read.buffer().isBuffer());
+ assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
assertEquals(0, partition.getBuffersInBacklog());
- assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
- assertFalse(read.buffer().isRecycled());
- read.buffer().recycleBuffer();
- assertTrue(read.buffer().isRecycled());
- assertTrue(read.nextBufferIsEvent());
assertTrue(reader.nextBufferIsEvent()); // end of partition event
- read = reader.getNextBuffer();
- assertNotNull(read);
- assertFalse(read.buffer().isBuffer());
+ assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
assertEquals(0, partition.getBuffersInBacklog());
- assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
- assertEquals(EndOfPartitionEvent.class,
- EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass());
- assertFalse(read.buffer().isRecycled());
- read.buffer().recycleBuffer();
- assertTrue(read.buffer().isRecycled());
- assertFalse(read.nextBufferIsEvent());
// finally check that the bufferConsumer has been freed after a successful (or failed) write
final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
@@ -292,10 +259,13 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
SpillableSubpartition partition = createSubpartition();
BufferConsumer bufferConsumer = createFilledBufferConsumer(BUFFER_DATA_SIZE, BUFFER_DATA_SIZE);
+ BufferConsumer eventBufferConsumer =
+ EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1));
+ final int eventSize = eventBufferConsumer.getWrittenBytes();
partition.add(bufferConsumer.copy());
partition.add(bufferConsumer.copy());
- partition.add(BufferBuilderTestUtils.createEventBufferConsumer(BUFFER_DATA_SIZE));
+ partition.add(eventBufferConsumer);
partition.add(bufferConsumer);
partition.finish();
@@ -311,17 +281,12 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertFalse(bufferConsumer.isRecycled());
assertFalse(reader.nextBufferIsEvent());
- BufferAndBacklog read = reader.getNextBuffer(); // first buffer (non-spilled)
- assertNotNull(read);
- assertTrue(read.buffer().isBuffer());
+ // first buffer (non-spilled)
+ assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, false);
assertEquals(BUFFER_DATA_SIZE, partition.getTotalNumberOfBytes()); // only updated when getting/spilling the buffers
assertEquals(2, partition.getBuffersInBacklog());
- assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
- read.buffer().recycleBuffer();
- assertTrue(read.isMoreAvailable());
assertEquals(1, listener.getNumNotifications()); // since isMoreAvailable is set to true, no need for notification
assertFalse(bufferConsumer.isRecycled());
- assertFalse(read.nextBufferIsEvent());
// Spill now
assertEquals(3, partition.releaseMemory());
@@ -330,59 +295,44 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertEquals(5, partition.getTotalNumberOfBuffers());
assertEquals(2, partition.getBuffersInBacklog());
// only updated when getting/spilling the buffers but without the nextBuffer (kept in memory)
- assertEquals(BUFFER_DATA_SIZE * 3 + 4, partition.getTotalNumberOfBytes());
+ assertEquals(BUFFER_DATA_SIZE * 2 + eventSize + 4, partition.getTotalNumberOfBytes());
+ // wait for successfully spilling all buffers (before that we may not access any spilled buffer and cannot rely on isMoreAvailable!)
listener.awaitNotifications(2, 30_000);
// Spiller finished
assertEquals(2, listener.getNumNotifications());
+ // after consuming and releasing the next buffer, the bufferConsumer may be freed,
+ // depending on the timing of the last write operation
+ // -> retain once so that we can check below
+ Buffer buffer = bufferConsumer.build();
+ buffer.retainBuffer();
+
assertFalse(reader.nextBufferIsEvent()); // second buffer (retained in SpillableSubpartition#nextBuffer)
- read = reader.getNextBuffer();
- assertNotNull(read);
- assertTrue(read.buffer().isBuffer());
- assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics
+ assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, false);
+ assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // finally integrates the nextBuffer statistics
assertEquals(1, partition.getBuffersInBacklog());
- assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
- read.buffer().recycleBuffer();
- // now the bufferConsumer may be freed, depending on the timing of the write operation
- // -> let's do this check at the end of the test (to save some time)
- assertTrue(read.nextBufferIsEvent());
+
+ bufferConsumer.close(); // recycle the retained buffer from above (should be the last reference!)
assertTrue(reader.nextBufferIsEvent()); // the event (spilled)
- read = reader.getNextBuffer();
- assertNotNull(read);
- assertFalse(read.buffer().isBuffer());
- assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
+ assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
+ assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
assertEquals(1, partition.getBuffersInBacklog());
- assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
- read.buffer().recycleBuffer();
- assertFalse(read.nextBufferIsEvent());
assertFalse(reader.nextBufferIsEvent()); // last buffer (spilled)
- read = reader.getNextBuffer();
- assertNotNull(read);
- assertTrue(read.buffer().isBuffer());
- assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
+ assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 0, true, true);
+ assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
assertEquals(0, partition.getBuffersInBacklog());
- assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
- assertFalse(read.buffer().isRecycled());
- read.buffer().recycleBuffer();
- assertTrue(read.buffer().isRecycled());
- assertTrue(read.nextBufferIsEvent());
+
+ buffer.recycleBuffer();
+ assertTrue(buffer.isRecycled());
// End of partition
assertTrue(reader.nextBufferIsEvent());
- read = reader.getNextBuffer();
- assertNotNull(read);
- assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
+ assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
+ assertEquals(BUFFER_DATA_SIZE * 3 + eventSize + 4, partition.getTotalNumberOfBytes()); // already updated during spilling
assertEquals(0, partition.getBuffersInBacklog());
- assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
- assertEquals(EndOfPartitionEvent.class,
- EventSerializer.fromBuffer(read.buffer(), ClassLoader.getSystemClassLoader()).getClass());
- assertFalse(read.buffer().isRecycled());
- read.buffer().recycleBuffer();
- assertTrue(read.buffer().isRecycled());
- assertFalse(read.nextBufferIsEvent());
// finally check that the bufferConsumer has been freed after a successful (or failed) write
final long deadline = System.currentTimeMillis() + 30_000L; // 30 secs
http://git-wip-us.apache.org/repos/asf/flink/blob/d1a969f7/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index a3f18f6..8c90215 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -18,19 +18,26 @@
package org.apache.flink.runtime.io.network.partition;
+import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -138,11 +145,74 @@ public abstract class SubpartitionTestBase extends TestLogger {
ResultSubpartitionView readView,
int expectedReadableBufferSize,
boolean expectedIsMoreAvailable,
- int expectedBuffersInBacklog) throws IOException, InterruptedException {
+ int expectedBuffersInBacklog,
+ boolean expectedNextBufferIsEvent,
+ boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
+ assertNextBufferOrEvent(
+ readView,
+ expectedReadableBufferSize,
+ true,
+ null,
+ expectedIsMoreAvailable,
+ expectedBuffersInBacklog,
+ expectedNextBufferIsEvent,
+ expectedRecycledAfterRecycle);
+ }
+
+ static void assertNextEvent(
+ ResultSubpartitionView readView,
+ int expectedReadableBufferSize,
+ Class<? extends AbstractEvent> expectedEventClass,
+ boolean expectedIsMoreAvailable,
+ int expectedBuffersInBacklog,
+ boolean expectedNextBufferIsEvent,
+ boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
+ assertNextBufferOrEvent(
+ readView,
+ expectedReadableBufferSize,
+ false,
+ expectedEventClass,
+ expectedIsMoreAvailable,
+ expectedBuffersInBacklog,
+ expectedNextBufferIsEvent,
+ expectedRecycledAfterRecycle);
+ }
+
+ private static void assertNextBufferOrEvent(
+ ResultSubpartitionView readView,
+ int expectedReadableBufferSize,
+ boolean expectedIsBuffer,
+ @Nullable Class<? extends AbstractEvent> expectedEventClass,
+ boolean expectedIsMoreAvailable,
+ int expectedBuffersInBacklog,
+ boolean expectedNextBufferIsEvent,
+ boolean expectedRecycledAfterRecycle) throws IOException, InterruptedException {
+ checkArgument(expectedEventClass == null || !expectedIsBuffer);
+
ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer();
- assertEquals(expectedReadableBufferSize, bufferAndBacklog.buffer().readableBytes());
- assertEquals(expectedIsMoreAvailable, bufferAndBacklog.isMoreAvailable());
- assertEquals(expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
+ assertNotNull(bufferAndBacklog);
+ try {
+ assertEquals("buffer size", expectedReadableBufferSize,
+ bufferAndBacklog.buffer().readableBytes());
+ assertEquals("buffer or event", expectedIsBuffer,
+ bufferAndBacklog.buffer().isBuffer());
+ if (expectedEventClass != null) {
+ assertThat(EventSerializer
+ .fromBuffer(bufferAndBacklog.buffer(), ClassLoader.getSystemClassLoader()),
+ instanceOf(expectedEventClass));
+ }
+ assertEquals("more available", expectedIsMoreAvailable,
+ bufferAndBacklog.isMoreAvailable());
+ assertEquals("more available", expectedIsMoreAvailable, readView.isAvailable());
+ assertEquals("backlog", expectedBuffersInBacklog, bufferAndBacklog.buffersInBacklog());
+ assertEquals("next is event", expectedNextBufferIsEvent,
+ bufferAndBacklog.nextBufferIsEvent());
+
+ assertFalse("not recycled", bufferAndBacklog.buffer().isRecycled());
+ } finally {
+ bufferAndBacklog.buffer().recycleBuffer();
+ }
+ assertEquals("recycled", expectedRecycledAfterRecycle, bufferAndBacklog.buffer().isRecycled());
}
protected void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {