You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by nk...@apache.org on 2018/09/19 19:32:54 UTC

[flink] 09/11: [hotfix][network][tests] split PipelinedSubpartitionTest for better initialization

This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 72d522efeef956cadeb8fe53778985855f8ee738
Author: Nico Kruber <ni...@data-artisans.com>
AuthorDate: Tue Sep 18 12:10:21 2018 +0200

    [hotfix][network][tests] split PipelinedSubpartitionTest for better initialization
    
    - add PipelinedSubpartitionWithReadViewTest which always creates a subpartition,
    an availability listener, and a read view before each test and cleans up after
    each test
    - remove mockito use from testBasicPipelinedProduceConsumeLogic()
---
 .../partition/PipelinedSubpartitionTest.java       | 314 +--------------------
 .../PipelinedSubpartitionWithReadViewTest.java     | 276 ++++++++++++++++++
 .../io/network/partition/SubpartitionTestBase.java |   2 +-
 3 files changed, 292 insertions(+), 300 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
index b75bb7a..82f61ab 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java
@@ -40,19 +40,12 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
-import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
-import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
 import static org.apache.flink.util.FutureUtil.waitForAll;
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -62,6 +55,8 @@ import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link PipelinedSubpartition}.
+ *
+ * @see PipelinedSubpartitionWithReadViewTest
  */
 public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 
@@ -80,189 +75,6 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		return new PipelinedSubpartition(0, parent);
 	}
 
-	@Test(expected = IllegalStateException.class)
-	public void testAddTwoNonFinishedBuffer() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-		availablityListener.resetNotificationCounters();
-
-		try {
-			subpartition.add(createBufferBuilder().createBufferConsumer());
-			subpartition.add(createBufferBuilder().createBufferConsumer());
-			assertNull(readView.getNextBuffer());
-		} finally {
-			subpartition.release();
-		}
-	}
-
-	@Test
-	public void testAddEmptyNonFinishedBuffer() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-		availablityListener.resetNotificationCounters();
-
-		try {
-			assertEquals(0, availablityListener.getNumNotifications());
-
-			BufferBuilder bufferBuilder = createBufferBuilder();
-			subpartition.add(bufferBuilder.createBufferConsumer());
-
-			assertEquals(0, availablityListener.getNumNotifications());
-			assertNull(readView.getNextBuffer());
-
-			bufferBuilder.finish();
-			bufferBuilder = createBufferBuilder();
-			subpartition.add(bufferBuilder.createBufferConsumer());
-
-			assertEquals(1, availablityListener.getNumNotifications()); // notification from finishing previous buffer.
-			assertNull(readView.getNextBuffer());
-			assertEquals(1, subpartition.getBuffersInBacklog());
-		} finally {
-			readView.releaseAllResources();
-			subpartition.release();
-		}
-	}
-
-	@Test
-	public void testAddNonEmptyNotFinishedBuffer() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-		availablityListener.resetNotificationCounters();
-
-		try {
-			assertEquals(0, availablityListener.getNumNotifications());
-
-			BufferBuilder bufferBuilder = createBufferBuilder();
-			bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
-			subpartition.add(bufferBuilder.createBufferConsumer());
-
-			// note that since the buffer builder is not finished, there is still a retained instance!
-			assertNextBuffer(readView, 1024, false, 1, false, false);
-			assertEquals(1, subpartition.getBuffersInBacklog());
-		} finally {
-			readView.releaseAllResources();
-			subpartition.release();
-		}
-	}
-
-	/**
-	 * Normally moreAvailable flag from InputChannel should ignore non finished BufferConsumers, otherwise we would
-	 * busy loop on the unfinished BufferConsumers.
-	 */
-	@Test
-	public void testUnfinishedBufferBehindFinished() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-
-		try {
-			subpartition.add(createFilledBufferConsumer(1025)); // finished
-			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
-
-			assertThat(availablityListener.getNumNotifications(), greaterThan(0L));
-			assertNextBuffer(readView, 1025, false, 1, false, true);
-			// not notified, but we could still access the unfinished buffer
-			assertNextBuffer(readView, 1024, false, 1, false, false);
-			assertNoNextBuffer(readView);
-		} finally {
-			subpartition.release();
-		}
-	}
-
-	/**
-	 * After flush call unfinished BufferConsumers should be reported as available, otherwise we might not flush some
-	 * of the data.
-	 */
-	@Test
-	public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-
-		try {
-			subpartition.add(createFilledBufferConsumer(1025)); // finished
-			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
-			long oldNumNotifications = availablityListener.getNumNotifications();
-			subpartition.flush();
-			// buffer queue is > 1, should already be notified, no further notification necessary
-			assertThat(oldNumNotifications, greaterThan(0L));
-			assertEquals(oldNumNotifications, availablityListener.getNumNotifications());
-
-			assertNextBuffer(readView, 1025, true, 1, false, true);
-			assertNextBuffer(readView, 1024, false, 1, false, false);
-			assertNoNextBuffer(readView);
-		} finally {
-			subpartition.release();
-		}
-	}
-
-	/**
-	 * A flush call with a buffer size of 1 should always notify consumers (unless already flushed).
-	 */
-	@Test
-	public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-
-		try {
-			// no buffers -> no notification or any other effects
-			subpartition.flush();
-			assertEquals(0, availablityListener.getNumNotifications());
-
-			subpartition.add(createFilledBufferConsumer(1025)); // finished
-			subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
-
-			assertNextBuffer(readView, 1025, false, 1, false, true);
-
-			long oldNumNotifications = availablityListener.getNumNotifications();
-			subpartition.flush();
-			// buffer queue is 1 again -> need to flush
-			assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
-			subpartition.flush();
-			// calling again should not flush again
-			assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
-
-			assertNextBuffer(readView, 1024, false, 1, false, false);
-			assertNoNextBuffer(readView);
-		} finally {
-			subpartition.release();
-		}
-	}
-
-	@Test
-	public void testMultipleEmptyBuffers() throws Exception {
-		final ResultSubpartition subpartition = createSubpartition();
-		AwaitableBufferAvailablityListener availablityListener = new AwaitableBufferAvailablityListener();
-		ResultSubpartitionView readView = subpartition.createReadView(availablityListener);
-		availablityListener.resetNotificationCounters();
-
-		try {
-			assertEquals(0, availablityListener.getNumNotifications());
-
-			subpartition.add(createFilledBufferConsumer(0));
-
-			assertEquals(1, availablityListener.getNumNotifications());
-			subpartition.add(createFilledBufferConsumer(0));
-			assertEquals(2, availablityListener.getNumNotifications());
-
-			subpartition.add(createFilledBufferConsumer(0));
-			assertEquals(2, availablityListener.getNumNotifications());
-			assertEquals(3, subpartition.getBuffersInBacklog());
-
-			subpartition.add(createFilledBufferConsumer(1024));
-			assertEquals(2, availablityListener.getNumNotifications());
-
-			assertNextBuffer(readView, 1024, false, 0, false, true);
-		} finally {
-			readView.releaseAllResources();
-			subpartition.release();
-		}
-	}
-
 	@Test
 	public void testIllegalReadViewRequest() throws Exception {
 		final PipelinedSubpartition subpartition = createSubpartition();
@@ -278,100 +90,23 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		}
 	}
 
+	/**
+	 * Verifies that the isReleased() check of the view checks the parent
+	 * subpartition.
+	 */
 	@Test
-	public void testEmptyFlush() throws Exception {
-		final PipelinedSubpartition subpartition = createSubpartition();
+	public void testIsReleasedChecksParent() {
+		PipelinedSubpartition subpartition = mock(PipelinedSubpartition.class);
 
-		AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
-		subpartition.createReadView(listener);
-		subpartition.flush();
-		assertEquals(0, listener.getNumNotifications());
-	}
+		PipelinedSubpartitionView reader = new PipelinedSubpartitionView(
+			subpartition, mock(BufferAvailabilityListener.class));
 
-	@Test
-	public void testBasicPipelinedProduceConsumeLogic() throws Exception {
-		final PipelinedSubpartition subpartition = createSubpartition();
+		assertFalse(reader.isReleased());
+		verify(subpartition, times(1)).isReleased();
 
-		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
-
-		ResultSubpartitionView view = subpartition.createReadView(listener);
-
-		// Empty => should return null
-		assertFalse(view.nextBufferIsEvent());
-		assertNoNextBuffer(view);
-		assertFalse(view.nextBufferIsEvent()); // also after getNextBuffer()
-		verify(listener, times(0)).notifyDataAvailable();
-
-		// Add data to the queue...
-		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-
-		assertEquals(1, subpartition.getTotalNumberOfBuffers());
-		assertEquals(1, subpartition.getBuffersInBacklog());
-		assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-
-		// ...should have resulted in a notification
-		verify(listener, times(1)).notifyDataAvailable();
-
-		// ...and one available result
-		assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
-		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(0, subpartition.getBuffersInBacklog());
-		assertNoNextBuffer(view);
-		assertEquals(0, subpartition.getBuffersInBacklog());
-
-		// Add data to the queue...
-		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-
-		assertEquals(2, subpartition.getTotalNumberOfBuffers());
-		assertEquals(1, subpartition.getBuffersInBacklog());
-		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		verify(listener, times(2)).notifyDataAvailable();
-
-		assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
-		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(0, subpartition.getBuffersInBacklog());
-		assertNoNextBuffer(view);
-		assertEquals(0, subpartition.getBuffersInBacklog());
-
-		// some tests with events
-
-		// fill with: buffer, event, and buffer
-		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-		subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
-		assertFalse(view.nextBufferIsEvent());
-
-		assertEquals(5, subpartition.getTotalNumberOfBuffers());
-		assertEquals(2, subpartition.getBuffersInBacklog()); // two buffers (events don't count)
-		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		verify(listener, times(4)).notifyDataAvailable();
-
-		// the first buffer
-		assertNextBuffer(view, BUFFER_SIZE, true, subpartition.getBuffersInBacklog() - 1, true, true);
-		assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(1, subpartition.getBuffersInBacklog());
-
-		// the event
-		assertNextEvent(view, BUFFER_SIZE, null, true, subpartition.getBuffersInBacklog(), false, true);
-		assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(1, subpartition.getBuffersInBacklog());
-
-		// the remaining buffer
-		assertNextBuffer(view, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
-		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(0, subpartition.getBuffersInBacklog());
-
-		// nothing more
-		assertNoNextBuffer(view);
-		assertEquals(0, subpartition.getBuffersInBacklog());
-
-		assertEquals(5, subpartition.getTotalNumberOfBuffers());
-		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
-		verify(listener, times(4)).notifyDataAvailable();
+		when(subpartition.isReleased()).thenReturn(true);
+		assertTrue(reader.isReleased());
+		verify(subpartition, times(2)).isReleased();
 	}
 
 	@Test
@@ -394,25 +129,6 @@ public class PipelinedSubpartitionTest extends SubpartitionTestBase {
 		testProduceConsume(true, true);
 	}
 
-	/**
-	 * Verifies that the isReleased() check of the view checks the parent
-	 * subpartition.
-	 */
-	@Test
-	public void testIsReleasedChecksParent() throws Exception {
-		PipelinedSubpartition subpartition = mock(PipelinedSubpartition.class);
-
-		PipelinedSubpartitionView reader = new PipelinedSubpartitionView(
-				subpartition, mock(BufferAvailabilityListener.class));
-
-		assertFalse(reader.isReleased());
-		verify(subpartition, times(1)).isReleased();
-
-		when(subpartition.isReleased()).thenReturn(true);
-		assertTrue(reader.isReleased());
-		verify(subpartition, times(2)).isReleased();
-	}
-
 	private void testProduceConsume(boolean isSlowProducer, boolean isSlowConsumer) throws Exception {
 		// Config
 		final int producerBufferPoolSize = 8;
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
new file mode 100644
index 0000000..6f9920e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createEventBufferConsumer;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
+import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextBuffer;
+import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNextEvent;
+import static org.apache.flink.runtime.io.network.partition.SubpartitionTestBase.assertNoNextBuffer;
+import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_SIZE;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Additional tests for {@link PipelinedSubpartition} which require an availability listener and a
+ * read view.
+ *
+ * @see PipelinedSubpartitionTest
+ */
+public class PipelinedSubpartitionWithReadViewTest {
+
+	private PipelinedSubpartition subpartition;
+	private AwaitableBufferAvailablityListener availablityListener;
+	private PipelinedSubpartitionView readView;
+
+	@Before
+	public void setup() throws IOException {
+		final ResultPartition parent = mock(ResultPartition.class);
+		subpartition = new PipelinedSubpartition(0, parent);
+		availablityListener = new AwaitableBufferAvailablityListener();
+		readView = subpartition.createReadView(availablityListener);
+	}
+
+	@After
+	public void tearDown() {
+		readView.releaseAllResources();
+		subpartition.release();
+	}
+
+	@Test(expected = IllegalStateException.class)
+	public void testAddTwoNonFinishedBuffer() {
+		subpartition.add(createBufferBuilder().createBufferConsumer());
+		subpartition.add(createBufferBuilder().createBufferConsumer());
+		assertNull(readView.getNextBuffer());
+	}
+
+	@Test
+	public void testAddEmptyNonFinishedBuffer() {
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		BufferBuilder bufferBuilder = createBufferBuilder();
+		subpartition.add(bufferBuilder.createBufferConsumer());
+
+		assertEquals(0, availablityListener.getNumNotifications());
+		assertNull(readView.getNextBuffer());
+
+		bufferBuilder.finish();
+		bufferBuilder = createBufferBuilder();
+		subpartition.add(bufferBuilder.createBufferConsumer());
+
+		assertEquals(1, availablityListener.getNumNotifications()); // notification from finishing previous buffer.
+		assertNull(readView.getNextBuffer());
+		assertEquals(1, subpartition.getBuffersInBacklog());
+	}
+
+	@Test
+	public void testAddNonEmptyNotFinishedBuffer() throws Exception {
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		BufferBuilder bufferBuilder = createBufferBuilder();
+		bufferBuilder.appendAndCommit(ByteBuffer.allocate(1024));
+		subpartition.add(bufferBuilder.createBufferConsumer());
+
+		// note that since the buffer builder is not finished, there is still a retained instance!
+		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertEquals(1, subpartition.getBuffersInBacklog());
+	}
+
+	/**
+	 * Normally moreAvailable flag from InputChannel should ignore non finished BufferConsumers, otherwise we would
+	 * busy loop on the unfinished BufferConsumers.
+	 */
+	@Test
+	public void testUnfinishedBufferBehindFinished() throws Exception {
+		subpartition.add(createFilledBufferConsumer(1025)); // finished
+		subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+
+		assertThat(availablityListener.getNumNotifications(), greaterThan(0L));
+		assertNextBuffer(readView, 1025, false, 1, false, true);
+		// not notified, but we could still access the unfinished buffer
+		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertNoNextBuffer(readView);
+	}
+
+	/**
+	 * After flush call unfinished BufferConsumers should be reported as available, otherwise we might not flush some
+	 * of the data.
+	 */
+	@Test
+	public void testFlushWithUnfinishedBufferBehindFinished() throws Exception {
+		subpartition.add(createFilledBufferConsumer(1025)); // finished
+		subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+		long oldNumNotifications = availablityListener.getNumNotifications();
+		subpartition.flush();
+		// buffer queue is > 1, should already be notified, no further notification necessary
+		assertThat(oldNumNotifications, greaterThan(0L));
+		assertEquals(oldNumNotifications, availablityListener.getNumNotifications());
+
+		assertNextBuffer(readView, 1025, true, 1, false, true);
+		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertNoNextBuffer(readView);
+	}
+
+	/**
+	 * A flush call with a buffer size of 1 should always notify consumers (unless already flushed).
+	 */
+	@Test
+	public void testFlushWithUnfinishedBufferBehindFinished2() throws Exception {
+		// no buffers -> no notification or any other effects
+		subpartition.flush();
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		subpartition.add(createFilledBufferConsumer(1025)); // finished
+		subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
+
+		assertNextBuffer(readView, 1025, false, 1, false, true);
+
+		long oldNumNotifications = availablityListener.getNumNotifications();
+		subpartition.flush();
+		// buffer queue is 1 again -> need to flush
+		assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
+		subpartition.flush();
+		// calling again should not flush again
+		assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
+
+		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertNoNextBuffer(readView);
+	}
+
+	@Test
+	public void testMultipleEmptyBuffers() throws Exception {
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		subpartition.add(createFilledBufferConsumer(0));
+
+		assertEquals(1, availablityListener.getNumNotifications());
+		subpartition.add(createFilledBufferConsumer(0));
+		assertEquals(2, availablityListener.getNumNotifications());
+
+		subpartition.add(createFilledBufferConsumer(0));
+		assertEquals(2, availablityListener.getNumNotifications());
+		assertEquals(3, subpartition.getBuffersInBacklog());
+
+		subpartition.add(createFilledBufferConsumer(1024));
+		assertEquals(2, availablityListener.getNumNotifications());
+
+		assertNextBuffer(readView, 1024, false, 0, false, true);
+	}
+
+	@Test
+	public void testEmptyFlush()  {
+		subpartition.flush();
+		assertEquals(0, availablityListener.getNumNotifications());
+	}
+
+	@Test
+	public void testBasicPipelinedProduceConsumeLogic() throws Exception {
+		// Empty => should return null
+		assertFalse(readView.nextBufferIsEvent());
+		assertNoNextBuffer(readView);
+		assertFalse(readView.nextBufferIsEvent()); // also after getNextBuffer()
+		assertEquals(0, availablityListener.getNumNotifications());
+
+		// Add data to the queue...
+		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+
+		assertEquals(1, subpartition.getTotalNumberOfBuffers());
+		assertEquals(1, subpartition.getBuffersInBacklog());
+		assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+
+		// ...should have resulted in a notification
+		assertEquals(1, availablityListener.getNumNotifications());
+
+		// ...and one available result
+		assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(0, subpartition.getBuffersInBacklog());
+		assertNoNextBuffer(readView);
+		assertEquals(0, subpartition.getBuffersInBacklog());
+
+		// Add data to the queue...
+		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+
+		assertEquals(2, subpartition.getTotalNumberOfBuffers());
+		assertEquals(1, subpartition.getBuffersInBacklog());
+		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(2, availablityListener.getNumNotifications());
+
+		assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(0, subpartition.getBuffersInBacklog());
+		assertNoNextBuffer(readView);
+		assertEquals(0, subpartition.getBuffersInBacklog());
+
+		// some tests with events
+
+		// fill with: buffer, event, and buffer
+		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+		subpartition.add(createEventBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+		subpartition.add(createFilledBufferConsumer(BUFFER_SIZE));
+		assertFalse(readView.nextBufferIsEvent());
+
+		assertEquals(5, subpartition.getTotalNumberOfBuffers());
+		assertEquals(2, subpartition.getBuffersInBacklog()); // two buffers (events don't count)
+		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(4, availablityListener.getNumNotifications());
+
+		// the first buffer
+		assertNextBuffer(readView, BUFFER_SIZE, true, subpartition.getBuffersInBacklog() - 1, true, true);
+		assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(1, subpartition.getBuffersInBacklog());
+
+		// the event
+		assertNextEvent(readView, BUFFER_SIZE, null, true, subpartition.getBuffersInBacklog(), false, true);
+		assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(1, subpartition.getBuffersInBacklog());
+
+		// the remaining buffer
+		assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
+		assertEquals(0, subpartition.getBuffersInBacklog());
+
+		// nothing more
+		assertNoNextBuffer(readView);
+		assertEquals(0, subpartition.getBuffersInBacklog());
+
+		assertEquals(5, subpartition.getTotalNumberOfBuffers());
+		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
+		assertEquals(4, availablityListener.getNumNotifications());
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 5989cf8..9f5e6d0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -217,7 +217,7 @@ public abstract class SubpartitionTestBase extends TestLogger {
 		assertEquals("recycled", expectedRecycledAfterRecycle, bufferAndBacklog.buffer().isRecycled());
 	}
 
-	protected void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
+	static void assertNoNextBuffer(ResultSubpartitionView readView) throws IOException, InterruptedException {
 		assertNull(readView.getNextBuffer());
 	}
 }