You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/19 08:56:58 UTC

[1/4] flink git commit: [hotfix][checkstyle] fix warnings in LocalBufferPool

Repository: flink
Updated Branches:
  refs/heads/master 236d28983 -> 388a083c9


[hotfix][checkstyle] fix warnings in LocalBufferPool


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ce3ff64
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ce3ff64
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ce3ff64

Branch: refs/heads/master
Commit: 5ce3ff643b1c254c254534fd03f9035bb0cd4964
Parents: 236d289
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Apr 6 19:35:44 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 10:56:06 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/io/network/buffer/LocalBufferPool.java  | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ce3ff64/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 0a311aa..fa15678 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -33,14 +33,14 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * A buffer pool used to manage a number of {@link Buffer} instances from the
  * {@link NetworkBufferPool}.
- * <p>
- * Buffer requests are mediated to the network buffer pool to ensure dead-lock
+ *
+ * <p>Buffer requests are mediated to the network buffer pool to ensure dead-lock
  * free operation of the network stack by limiting the number of buffers per
  * local buffer pool. It also implements the default mechanism for buffer
  * recycling, which ensures that every buffer is ultimately returned to the
  * network buffer pool.
  *
- * <p> The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
+ * <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
  * will then lazily return the required number of buffers to the {@link NetworkBufferPool} to
  * match its new size.
  */
@@ -50,7 +50,7 @@ class LocalBufferPool implements BufferPool {
 	/** Global network buffer pool to get buffers from. */
 	private final NetworkBufferPool networkBufferPool;
 
-	/** The minimum number of required segments for this pool */
+	/** The minimum number of required segments for this pool. */
 	private final int numberOfRequiredMemorySegments;
 
 	/**
@@ -68,7 +68,7 @@ class LocalBufferPool implements BufferPool {
 	/** Maximum number of network buffers to allocate. */
 	private final int maxNumberOfMemorySegments;
 
-	/** The current size of this pool */
+	/** The current size of this pool. */
 	private int currentPoolSize;
 
 	/**


[3/4] flink git commit: [hotfix][network] minor optimisation in LocalBufferPool

Posted by sr...@apache.org.
[hotfix][network] minor optimisation in LocalBufferPool


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a932ab86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a932ab86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a932ab86

Branch: refs/heads/master
Commit: a932ab8631fec8f9a670e7c0e5e9ef3b8a2b7f63
Parents: 75e5953
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Apr 6 19:36:33 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 10:56:19 2018 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/io/network/buffer/LocalBufferPool.java  | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a932ab86/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index fa15678..92a8e94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -228,9 +228,7 @@ class LocalBufferPool implements BufferPool {
 
 					if (segment != null) {
 						numberOfRequestedMemorySegments++;
-						availableMemorySegments.add(segment);
-
-						continue;
+						return segment;
 					}
 				}
 


[4/4] flink git commit: [FLINK-9144][network] fix SpillableSubpartition causing jobs to hang when spilling

Posted by sr...@apache.org.
[FLINK-9144][network] fix SpillableSubpartition causing jobs to hang when spilling

This closes #5842.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/388a083c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/388a083c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/388a083c

Branch: refs/heads/master
Commit: 388a083c909d1f1b065e549ba70359358eb6e330
Parents: a932ab8
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Apr 6 19:34:44 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 10:56:27 2018 +0200

----------------------------------------------------------------------
 .../test-scripts/test_batch_allround.sh         |  26 ++++-
 .../partition/SpillableSubpartition.java        |  45 ++++++--
 .../network/buffer/BufferBuilderTestUtils.java  |   4 +
 .../partition/SpillableSubpartitionTest.java    | 114 ++++++++++++++++++-
 4 files changed, 169 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/388a083c/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
index acdc37e..834fbad 100755
--- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
+++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
@@ -23,14 +23,34 @@ TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-dataset-allr
 
 echo "Run DataSet-Allround-Test Program"
 
+# modify configuration to include spilling to disk
+cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+echo "taskmanager.network.memory.min: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml
+echo "taskmanager.network.memory.max: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml
+
 start_cluster
 $FLINK_DIR/bin/taskmanager.sh start
 $FLINK_DIR/bin/taskmanager.sh start
 $FLINK_DIR/bin/taskmanager.sh start
 
-$FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR --loadFactor 2 --outputPath $TEST_DATA_DIR/out/dataset_allround
+function test_cleanup {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  stop_cluster
+  $FLINK_DIR/bin/taskmanager.sh stop-all
+
+  # revert our modifications to the Flink distribution
+  mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+
+  # make sure to run regular cleanup as well
+  cleanup
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
 
-stop_cluster
-$FLINK_DIR/bin/taskmanager.sh stop-all
+$FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR --loadFactor 4 --outputPath $TEST_DATA_DIR/out/dataset_allround
 
 check_result_hash "DataSet-Allround-Test" $TEST_DATA_DIR/out/dataset_allround "d3cf2aeaa9320c772304cba42649eb47"

http://git-wip-us.apache.org/repos/asf/flink/blob/388a083c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 7f92a34..69b461b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -93,6 +93,11 @@ class SpillableSubpartition extends ResultSubpartition {
 
 	@Override
 	public synchronized boolean add(BufferConsumer bufferConsumer) throws IOException {
+		return add(bufferConsumer, false);
+	}
+
+	private boolean add(BufferConsumer bufferConsumer, boolean forceFinishRemainingBuffers)
+			throws IOException {
 		checkNotNull(bufferConsumer);
 
 		synchronized (buffers) {
@@ -109,7 +114,7 @@ class SpillableSubpartition extends ResultSubpartition {
 			increaseBuffersInBacklog(bufferConsumer);
 
 			if (spillWriter != null) {
-				spillFinishedBufferConsumers();
+				spillFinishedBufferConsumers(forceFinishRemainingBuffers);
 			}
 		}
 		return true;
@@ -127,7 +132,7 @@ class SpillableSubpartition extends ResultSubpartition {
 	@Override
 	public synchronized void finish() throws IOException {
 		synchronized (buffers) {
-			if (add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE))) {
+			if (add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true)) {
 				isFinished = true;
 			}
 
@@ -228,7 +233,7 @@ class SpillableSubpartition extends ResultSubpartition {
 				spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel());
 
 				int numberOfBuffers = buffers.size();
-				long spilledBytes = spillFinishedBufferConsumers();
+				long spilledBytes = spillFinishedBufferConsumers(isFinished);
 				int spilledBuffers = numberOfBuffers - buffers.size();
 
 				LOG.debug("Spilling {} bytes ({} buffers} for sub partition {} of {}.",
@@ -243,21 +248,39 @@ class SpillableSubpartition extends ResultSubpartition {
 	}
 
 	@VisibleForTesting
-	protected long spillFinishedBufferConsumers() throws IOException {
+	long spillFinishedBufferConsumers(boolean forceFinishRemainingBuffers) throws IOException {
 		long spilledBytes = 0;
 
 		while (!buffers.isEmpty()) {
-			BufferConsumer bufferConsumer = buffers.peek();
+			BufferConsumer bufferConsumer = buffers.getFirst();
 			Buffer buffer = bufferConsumer.build();
 			updateStatistics(buffer);
-			spilledBytes += buffer.getSize();
-			spillWriter.writeBlock(buffer);
-
-			if (bufferConsumer.isFinished()) {
+			int bufferSize = buffer.getSize();
+			spilledBytes += bufferSize;
+
+			// NOTE we may be in the process of finishing the subpartition where any buffer should
+			// be treated as if it was finished!
+			if (bufferConsumer.isFinished() || forceFinishRemainingBuffers) {
+				if (bufferSize > 0) {
+					spillWriter.writeBlock(buffer);
+				} else {
+					// If we skip a buffer for the spill writer, we need to adapt the backlog accordingly
+					decreaseBuffersInBacklog(buffer);
+					buffer.recycleBuffer();
+				}
 				bufferConsumer.close();
 				buffers.poll();
-			}
-			else {
+			} else {
+				// If there is already data, we need to spill it anyway, since we do not get this
+				// slice from the buffer consumer again during the next build.
+				// BEWARE: by doing so, we increase the actual number of buffers in the spill writer!
+				if (bufferSize > 0) {
+					spillWriter.writeBlock(buffer);
+					increaseBuffersInBacklog(bufferConsumer);
+				} else {
+					buffer.recycleBuffer();
+				}
+
 				return spilledBytes;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/388a083c/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
index 7beb18f..9706a86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
@@ -47,6 +47,10 @@ public class BufferBuilderTestUtils {
 		BufferBuilder bufferBuilder = new BufferBuilder(
 			MemorySegmentFactory.allocateUnpooledSegment(size),
 			FreeingBufferRecycler.INSTANCE);
+		return fillBufferBuilder(bufferBuilder, dataSize);
+	}
+
+	public static BufferBuilder fillBufferBuilder(BufferBuilder bufferBuilder, int dataSize) {
 		bufferBuilder.appendAndCommit(ByteBuffer.allocate(dataSize));
 		return bufferBuilder;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/388a083c/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 09e0291..2e47379 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -42,7 +42,7 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
@@ -51,6 +51,7 @@ import java.util.concurrent.Future;
 
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
 import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.fillBufferBuilder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
@@ -257,6 +258,85 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 	}
 
 	/**
+	 * Tests that a spilled partition is correctly read back in via a spilled read view. The
+	 * partition went into spilled state before adding buffers and the access pattern resembles
+	 * the actual use of {@link org.apache.flink.runtime.io.network.api.writer.RecordWriter}.
+	 */
+	@Test
+	public void testConsumeSpilledPartitionSpilledBeforeAdd() throws Exception {
+		SpillableSubpartition partition = createSubpartition();
+		assertEquals(0, partition.releaseMemory()); // <---- SPILL to disk
+
+		BufferBuilder[] bufferBuilders = new BufferBuilder[] {
+			createBufferBuilder(BUFFER_DATA_SIZE),
+			createBufferBuilder(BUFFER_DATA_SIZE),
+			createBufferBuilder(BUFFER_DATA_SIZE),
+			createBufferBuilder(BUFFER_DATA_SIZE)
+		};
+		BufferConsumer[] bufferConsumers = Arrays.stream(bufferBuilders).map(
+			BufferBuilder::createBufferConsumer
+		).toArray(BufferConsumer[]::new);
+
+		BufferConsumer eventBufferConsumer =
+			EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1));
+		final int eventSize = eventBufferConsumer.getWrittenBytes();
+
+		// note: only the newest buffer may be unfinished!
+		partition.add(bufferConsumers[0]);
+		fillBufferBuilder(bufferBuilders[0], BUFFER_DATA_SIZE).finish();
+		partition.add(bufferConsumers[1]);
+		fillBufferBuilder(bufferBuilders[1], BUFFER_DATA_SIZE).finish();
+		partition.add(eventBufferConsumer);
+		partition.add(bufferConsumers[2]);
+		bufferBuilders[2].finish(); // remains empty
+		partition.add(bufferConsumers[3]);
+		// last one: partially filled, unfinished
+		fillBufferBuilder(bufferBuilders[3], BUFFER_DATA_SIZE / 2);
+		// finished buffers only:
+		int expectedSize = BUFFER_DATA_SIZE * 2 + eventSize;
+
+		// now the bufferConsumer may be freed, depending on the timing of the write operation
+		// -> let's do this check at the end of the test (to save some time)
+		// still same statistics
+		assertEquals(5, partition.getTotalNumberOfBuffers());
+		assertEquals(3, partition.getBuffersInBacklog());
+		assertEquals(expectedSize, partition.getTotalNumberOfBytes());
+
+		partition.finish();
+		expectedSize += BUFFER_DATA_SIZE / 2; // previously unfinished buffer
+		expectedSize += 4; // + one EndOfPartitionEvent
+		assertEquals(6, partition.getTotalNumberOfBuffers());
+		assertEquals(3, partition.getBuffersInBacklog());
+		assertEquals(expectedSize, partition.getTotalNumberOfBytes());
+		Arrays.stream(bufferConsumers).forEach(bufferConsumer -> assertTrue(bufferConsumer.isRecycled()));
+
+		AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
+		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
+
+		assertEquals(1, listener.getNumNotifications());
+
+		assertFalse(reader.nextBufferIsEvent()); // full buffer
+		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
+		assertEquals(2, partition.getBuffersInBacklog());
+
+		assertFalse(reader.nextBufferIsEvent()); // full buffer
+		assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
+		assertEquals(1, partition.getBuffersInBacklog());
+
+		assertTrue(reader.nextBufferIsEvent()); // event
+		assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
+		assertEquals(1, partition.getBuffersInBacklog());
+
+		assertFalse(reader.nextBufferIsEvent()); // partial buffer
+		assertNextBuffer(reader, BUFFER_DATA_SIZE / 2, true, 0, true, true);
+		assertEquals(0, partition.getBuffersInBacklog());
+
+		assertTrue(reader.nextBufferIsEvent()); // end of partition event
+		assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
+		assertEquals(0, partition.getBuffersInBacklog());
+	}
+
+	/**
 	 * Tests that a spilled partition is correctly read back in via a spilled
 	 * read view.
 	 */
@@ -668,19 +748,41 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 	}
 
 	/**
-	 * Tests {@link SpillableSubpartition#spillFinishedBufferConsumers()} spilled bytes counting.
+	 * Tests {@link SpillableSubpartition#spillFinishedBufferConsumers} spilled bytes and
+	 * buffers counting.
 	 */
 	@Test
-	public void testSpillFinishedBufferConsumers() throws Exception {
+	public void testSpillFinishedBufferConsumersFull() throws Exception {
 		SpillableSubpartition partition = createSubpartition();
 		BufferBuilder bufferBuilder = createBufferBuilder(BUFFER_DATA_SIZE);
 
 		partition.add(bufferBuilder.createBufferConsumer());
 		assertEquals(0, partition.releaseMemory());
+		assertEquals(1, partition.getBuffersInBacklog());
+		// finally fill the buffer with some bytes
+		fillBufferBuilder(bufferBuilder, BUFFER_DATA_SIZE).finish();
+		assertEquals(BUFFER_DATA_SIZE, partition.spillFinishedBufferConsumers(false));
+		assertEquals(1, partition.getBuffersInBacklog());
+	}
+
+	/**
+	 * Tests {@link SpillableSubpartition#spillFinishedBufferConsumers} spilled bytes and
+	 * buffers counting with partially filled buffers.
+	 */
+	@Test
+	public void testSpillFinishedBufferConsumersPartial() throws Exception {
+		SpillableSubpartition partition = createSubpartition();
+		BufferBuilder bufferBuilder = createBufferBuilder(BUFFER_DATA_SIZE * 2);
+
+		partition.add(bufferBuilder.createBufferConsumer());
+		fillBufferBuilder(bufferBuilder, BUFFER_DATA_SIZE);
+
+		assertEquals(0, partition.releaseMemory());
+		assertEquals(2, partition.getBuffersInBacklog()); // partial one spilled, buffer consumer still enqueued
 		// finally fill the buffer with some bytes
-		bufferBuilder.appendAndCommit(ByteBuffer.allocate(BUFFER_DATA_SIZE));
-		bufferBuilder.finish(); // so that this buffer can be removed from the queue
-		assertEquals(BUFFER_DATA_SIZE, partition.spillFinishedBufferConsumers());
+		fillBufferBuilder(bufferBuilder, BUFFER_DATA_SIZE).finish();
+		assertEquals(BUFFER_DATA_SIZE, partition.spillFinishedBufferConsumers(false));
+		assertEquals(2, partition.getBuffersInBacklog());
 	}
 
 	/**


[2/4] flink git commit: [hotfix][network] extend logging message in SpillableSubpartition

Posted by sr...@apache.org.
[hotfix][network] extend logging message in SpillableSubpartition


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75e5953b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75e5953b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75e5953b

Branch: refs/heads/master
Commit: 75e5953b91cf19f394e7db9ec5813f59203a7e32
Parents: 5ce3ff6
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Apr 6 19:36:15 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 10:56:12 2018 +0200

----------------------------------------------------------------------
 .../runtime/io/network/partition/SpillableSubpartition.java    | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/75e5953b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 6b731d4..7f92a34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -229,10 +229,12 @@ class SpillableSubpartition extends ResultSubpartition {
 
 				int numberOfBuffers = buffers.size();
 				long spilledBytes = spillFinishedBufferConsumers();
+				int spilledBuffers = numberOfBuffers - buffers.size();
 
-				LOG.debug("Spilling {} bytes for sub partition {} of {}.", spilledBytes, index, parent.getPartitionId());
+				LOG.debug("Spilling {} bytes ({} buffers} for sub partition {} of {}.",
+					spilledBytes, spilledBuffers, index, parent.getPartitionId());
 
-				return numberOfBuffers - buffers.size();
+				return spilledBuffers;
 			}
 		}