You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2019/07/08 10:39:55 UTC

[flink] branch master updated: [FLINK-11082][network] Fix the calculation of backlog in PipelinedSubpartition

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d051ccc  [FLINK-11082][network] Fix the calculation of backlog in PipelinedSubpartition
d051ccc is described below

commit d051cccd92236b13b4e9de96f0a0eeaf171b29ca
Author: Zhijiang <wa...@aliyun.com>
AuthorDate: Fri Jul 5 11:22:48 2019 +0800

    [FLINK-11082][network] Fix the calculation of backlog in PipelinedSubpartition
    
    The backlog of subpartition should indicate how many buffers are consumable, then the consumer could feedback the corresponding credits for transporting these buffers. But in current PipelinedSubpartition implementation, the backlog is increased by 1 when a BufferConsumer is added into PipelinedSubpartition, and decreased by 1 when a BufferConsumer is removed from PipelinedSubpartition. So the backlog only reflects how many buffers are retained in PipelinedSubpartition, which is not a [...]
    
    The backlog inconsistency might result in floating buffers misdistribution on consumer side, because the consumer would request floating buffers based on backlog value, then one floating buffer might not be used in RemoteInputChannel long time after requesting.
    
    Considering the solution, the last buffer in PipelinedSubpartition could only be consumable in the case of flush triggered or partition finished. So we could calculate the backlog precisely based on partition flushed/finished conditions.
---
 .../network/partition/PipelinedSubpartition.java   |   6 +-
 .../PipelinedSubpartitionWithReadViewTest.java     | 102 +++++++++++++++++----
 2 files changed, 88 insertions(+), 20 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
index 7394e6e..0c38ae2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
@@ -367,7 +367,11 @@ class PipelinedSubpartition extends ResultSubpartition {
 	@SuppressWarnings("FieldAccessNotGuarded")
 	@VisibleForTesting
 	public int getBuffersInBacklog() {
-		return buffersInBacklog;
+		if (flushRequested || isFinished) {
+			return buffersInBacklog;
+		} else {
+			return Math.max(buffersInBacklog - 1, 0);
+		}
 	}
 
 	private boolean shouldNotifyDataAvailable() {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
index af4eb05..e945342 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionWithReadViewTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
@@ -40,6 +41,7 @@ import static org.apache.flink.runtime.io.network.util.TestBufferFactory.BUFFER_
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -94,9 +96,10 @@ public class PipelinedSubpartitionWithReadViewTest {
 		bufferBuilder = createBufferBuilder();
 		subpartition.add(bufferBuilder.createBufferConsumer());
 
+		assertEquals(1, subpartition.getBuffersInBacklog());
 		assertEquals(1, availablityListener.getNumNotifications()); // notification from finishing previous buffer.
 		assertNull(readView.getNextBuffer());
-		assertEquals(1, subpartition.getBuffersInBacklog());
+		assertEquals(0, subpartition.getBuffersInBacklog());
 	}
 
 	@Test
@@ -108,8 +111,8 @@ public class PipelinedSubpartitionWithReadViewTest {
 		subpartition.add(bufferBuilder.createBufferConsumer());
 
 		// note that since the buffer builder is not finished, there is still a retained instance!
-		assertNextBuffer(readView, 1024, false, 1, false, false);
-		assertEquals(1, subpartition.getBuffersInBacklog());
+		assertEquals(0, subpartition.getBuffersInBacklog());
+		assertNextBuffer(readView, 1024, false, 0, false, false);
 	}
 
 	/**
@@ -121,10 +124,11 @@ public class PipelinedSubpartitionWithReadViewTest {
 		subpartition.add(createFilledBufferConsumer(1025)); // finished
 		subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
 
+		assertEquals(1, subpartition.getBuffersInBacklog());
 		assertThat(availablityListener.getNumNotifications(), greaterThan(0L));
-		assertNextBuffer(readView, 1025, false, 1, false, true);
+		assertNextBuffer(readView, 1025, false, 0, false, true);
 		// not notified, but we could still access the unfinished buffer
-		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertNextBuffer(readView, 1024, false, 0, false, false);
 		assertNoNextBuffer(readView);
 	}
 
@@ -137,13 +141,17 @@ public class PipelinedSubpartitionWithReadViewTest {
 		subpartition.add(createFilledBufferConsumer(1025)); // finished
 		subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
 		long oldNumNotifications = availablityListener.getNumNotifications();
+
+		assertEquals(1, subpartition.getBuffersInBacklog());
+
 		subpartition.flush();
 		// buffer queue is > 1, should already be notified, no further notification necessary
 		assertThat(oldNumNotifications, greaterThan(0L));
 		assertEquals(oldNumNotifications, availablityListener.getNumNotifications());
 
+		assertEquals(2, subpartition.getBuffersInBacklog());
 		assertNextBuffer(readView, 1025, true, 1, false, true);
-		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertNextBuffer(readView, 1024, false, 0, false, false);
 		assertNoNextBuffer(readView);
 	}
 
@@ -159,7 +167,8 @@ public class PipelinedSubpartitionWithReadViewTest {
 		subpartition.add(createFilledBufferConsumer(1025)); // finished
 		subpartition.add(createFilledBufferBuilder(1024).createBufferConsumer()); // not finished
 
-		assertNextBuffer(readView, 1025, false, 1, false, true);
+		assertEquals(1, subpartition.getBuffersInBacklog());
+		assertNextBuffer(readView, 1025, false, 0, false, true);
 
 		long oldNumNotifications = availablityListener.getNumNotifications();
 		subpartition.flush();
@@ -169,7 +178,8 @@ public class PipelinedSubpartitionWithReadViewTest {
 		// calling again should not flush again
 		assertEquals(oldNumNotifications + 1, availablityListener.getNumNotifications());
 
-		assertNextBuffer(readView, 1024, false, 1, false, false);
+		assertEquals(1, subpartition.getBuffersInBacklog());
+		assertNextBuffer(readView, 1024, false, 0, false, false);
 		assertNoNextBuffer(readView);
 	}
 
@@ -185,7 +195,7 @@ public class PipelinedSubpartitionWithReadViewTest {
 
 		subpartition.add(createFilledBufferConsumer(0));
 		assertEquals(2, availablityListener.getNumNotifications());
-		assertEquals(3, subpartition.getBuffersInBacklog());
+		assertEquals(2, subpartition.getBuffersInBacklog());
 
 		subpartition.add(createFilledBufferConsumer(1024));
 		assertEquals(2, availablityListener.getNumNotifications());
@@ -212,14 +222,14 @@ public class PipelinedSubpartitionWithReadViewTest {
 		assertFalse(readView.nextBufferIsEvent());
 
 		assertEquals(1, subpartition.getTotalNumberOfBuffers());
-		assertEquals(1, subpartition.getBuffersInBacklog());
+		assertEquals(0, subpartition.getBuffersInBacklog());
 		assertEquals(0, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 
 		// ...should have resulted in a notification
 		assertEquals(1, availablityListener.getNumNotifications());
 
 		// ...and one available result
-		assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+		assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true);
 		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(0, subpartition.getBuffersInBacklog());
 		assertNoNextBuffer(readView);
@@ -230,11 +240,11 @@ public class PipelinedSubpartitionWithReadViewTest {
 		assertFalse(readView.nextBufferIsEvent());
 
 		assertEquals(2, subpartition.getTotalNumberOfBuffers());
-		assertEquals(1, subpartition.getBuffersInBacklog());
+		assertEquals(0, subpartition.getBuffersInBacklog());
 		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(2, availablityListener.getNumNotifications());
 
-		assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+		assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true);
 		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(0, subpartition.getBuffersInBacklog());
 		assertNoNextBuffer(readView);
@@ -251,22 +261,22 @@ public class PipelinedSubpartitionWithReadViewTest {
 		assertFalse(readView.nextBufferIsEvent());
 
 		assertEquals(5, subpartition.getTotalNumberOfBuffers());
-		assertEquals(2, subpartition.getBuffersInBacklog()); // two buffers (events don't count)
+		assertEquals(1, subpartition.getBuffersInBacklog()); // two buffers (events don't count)
 		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(4, availablityListener.getNumNotifications());
 
 		// the first buffer
-		assertNextBuffer(readView, BUFFER_SIZE, true, subpartition.getBuffersInBacklog() - 1, true, true);
+		assertNextBuffer(readView, BUFFER_SIZE, true, 0, true, true);
 		assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(1, subpartition.getBuffersInBacklog());
+		assertEquals(0, subpartition.getBuffersInBacklog());
 
 		// the event
-		assertNextEvent(readView, BUFFER_SIZE, null, true, subpartition.getBuffersInBacklog(), false, true);
+		assertNextEvent(readView, BUFFER_SIZE, null, true, 0, false, true);
 		assertEquals(4 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
-		assertEquals(1, subpartition.getBuffersInBacklog());
+		assertEquals(0, subpartition.getBuffersInBacklog());
 
 		// the remaining buffer
-		assertNextBuffer(readView, BUFFER_SIZE, false, subpartition.getBuffersInBacklog() - 1, false, true);
+		assertNextBuffer(readView, BUFFER_SIZE, false, 0, false, true);
 		assertEquals(5 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes()); // only updated when getting the buffer
 		assertEquals(0, subpartition.getBuffersInBacklog());
 
@@ -279,6 +289,60 @@ public class PipelinedSubpartitionWithReadViewTest {
 		assertEquals(4, availablityListener.getNumNotifications());
 	}
 
+	@Test
+	public void testBacklogConsistentWithNumberOfConsumableBuffers() throws Exception {
+		testBacklogConsistentWithNumberOfConsumableBuffers(false, false);
+	}
+
+	@Test
+	public void testBacklogConsistentWithConsumableBuffersForFlushedPartition() throws Exception {
+		testBacklogConsistentWithNumberOfConsumableBuffers(true, false);
+	}
+
+	@Test
+	public void testBacklogConsistentWithConsumableBuffersForFinishedPartition() throws Exception {
+		testBacklogConsistentWithNumberOfConsumableBuffers(false, true);
+	}
+
+	private void testBacklogConsistentWithNumberOfConsumableBuffers(boolean isFlushRequested, boolean isFinished) throws Exception {
+		final int numberOfAddedBuffers = 5;
+
+		for (int i = 1; i <= numberOfAddedBuffers; i++) {
+			final BufferBuilder bufferBuilder = createFilledBufferBuilder(1024, 10);
+			subpartition.add(bufferBuilder.createBufferConsumer());
+
+			if (i < numberOfAddedBuffers || isFinished) {
+				bufferBuilder.finish();
+			}
+		}
+
+		if (isFlushRequested) {
+			subpartition.flush();
+		}
+
+		if (isFinished) {
+			subpartition.finish();
+		}
+
+		final int backlog = subpartition.getBuffersInBacklog();
+
+		int numberOfConsumableBuffers = 0;
+		try (final CloseableRegistry closeableRegistry = new CloseableRegistry()) {
+			while (readView.isAvailable()) {
+				ResultSubpartition.BufferAndBacklog bufferAndBacklog = readView.getNextBuffer();
+				assertNotNull(bufferAndBacklog);
+
+				if (bufferAndBacklog.buffer().isBuffer()) {
+					++numberOfConsumableBuffers;
+				}
+
+				closeableRegistry.registerCloseable(bufferAndBacklog.buffer() :: recycleBuffer);
+			}
+
+			assertThat(backlog, is(numberOfConsumableBuffers));
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	static void assertNextBuffer(