You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/04/19 08:56:58 UTC
[1/4] flink git commit: [hotfix][checkstyle] fix warnings in
LocalBufferPool
Repository: flink
Updated Branches:
refs/heads/master 236d28983 -> 388a083c9
[hotfix][checkstyle] fix warnings in LocalBufferPool
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ce3ff64
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ce3ff64
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ce3ff64
Branch: refs/heads/master
Commit: 5ce3ff643b1c254c254534fd03f9035bb0cd4964
Parents: 236d289
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Apr 6 19:35:44 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 10:56:06 2018 +0200
----------------------------------------------------------------------
.../flink/runtime/io/network/buffer/LocalBufferPool.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5ce3ff64/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index 0a311aa..fa15678 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -33,14 +33,14 @@ import static org.apache.flink.util.Preconditions.checkState;
/**
* A buffer pool used to manage a number of {@link Buffer} instances from the
* {@link NetworkBufferPool}.
- * <p>
- * Buffer requests are mediated to the network buffer pool to ensure dead-lock
+ *
+ * <p>Buffer requests are mediated to the network buffer pool to ensure dead-lock
* free operation of the network stack by limiting the number of buffers per
* local buffer pool. It also implements the default mechanism for buffer
* recycling, which ensures that every buffer is ultimately returned to the
* network buffer pool.
*
- * <p> The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
+ * <p>The size of this pool can be dynamically changed at runtime ({@link #setNumBuffers(int)}. It
* will then lazily return the required number of buffers to the {@link NetworkBufferPool} to
* match its new size.
*/
@@ -50,7 +50,7 @@ class LocalBufferPool implements BufferPool {
/** Global network buffer pool to get buffers from. */
private final NetworkBufferPool networkBufferPool;
- /** The minimum number of required segments for this pool */
+ /** The minimum number of required segments for this pool. */
private final int numberOfRequiredMemorySegments;
/**
@@ -68,7 +68,7 @@ class LocalBufferPool implements BufferPool {
/** Maximum number of network buffers to allocate. */
private final int maxNumberOfMemorySegments;
- /** The current size of this pool */
+ /** The current size of this pool. */
private int currentPoolSize;
/**
[3/4] flink git commit: [hotfix][network] minor optimisation in
LocalBufferPool
Posted by sr...@apache.org.
[hotfix][network] minor optimisation in LocalBufferPool
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a932ab86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a932ab86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a932ab86
Branch: refs/heads/master
Commit: a932ab8631fec8f9a670e7c0e5e9ef3b8a2b7f63
Parents: 75e5953
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Apr 6 19:36:33 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 10:56:19 2018 +0200
----------------------------------------------------------------------
.../apache/flink/runtime/io/network/buffer/LocalBufferPool.java | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a932ab86/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
index fa15678..92a8e94 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
@@ -228,9 +228,7 @@ class LocalBufferPool implements BufferPool {
if (segment != null) {
numberOfRequestedMemorySegments++;
- availableMemorySegments.add(segment);
-
- continue;
+ return segment;
}
}
[4/4] flink git commit: [FLINK-9144][network] fix
SpillableSubpartition causing jobs to hang when spilling
Posted by sr...@apache.org.
[FLINK-9144][network] fix SpillableSubpartition causing jobs to hang when spilling
This closes #5842.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/388a083c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/388a083c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/388a083c
Branch: refs/heads/master
Commit: 388a083c909d1f1b065e549ba70359358eb6e330
Parents: a932ab8
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Apr 6 19:34:44 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 10:56:27 2018 +0200
----------------------------------------------------------------------
.../test-scripts/test_batch_allround.sh | 26 ++++-
.../partition/SpillableSubpartition.java | 45 ++++++--
.../network/buffer/BufferBuilderTestUtils.java | 4 +
.../partition/SpillableSubpartitionTest.java | 114 ++++++++++++++++++-
4 files changed, 169 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/388a083c/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
index acdc37e..834fbad 100755
--- a/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
+++ b/flink-end-to-end-tests/test-scripts/test_batch_allround.sh
@@ -23,14 +23,34 @@ TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/flink-dataset-allr
echo "Run DataSet-Allround-Test Program"
+# modify configuration to include spilling to disk
+cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
+echo "taskmanager.network.memory.min: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml
+echo "taskmanager.network.memory.max: 10485760" >> $FLINK_DIR/conf/flink-conf.yaml
+
start_cluster
$FLINK_DIR/bin/taskmanager.sh start
$FLINK_DIR/bin/taskmanager.sh start
$FLINK_DIR/bin/taskmanager.sh start
-$FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR --loadFactor 2 --outputPath $TEST_DATA_DIR/out/dataset_allround
+function test_cleanup {
+ # don't call ourselves again for another signal interruption
+ trap "exit -1" INT
+ # don't call ourselves again for normal exit
+ trap "" EXIT
+
+ stop_cluster
+ $FLINK_DIR/bin/taskmanager.sh stop-all
+
+ # revert our modifications to the Flink distribution
+ mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
+
+ # make sure to run regular cleanup as well
+ cleanup
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
-stop_cluster
-$FLINK_DIR/bin/taskmanager.sh stop-all
+$FLINK_DIR/bin/flink run -p 4 $TEST_PROGRAM_JAR --loadFactor 4 --outputPath $TEST_DATA_DIR/out/dataset_allround
check_result_hash "DataSet-Allround-Test" $TEST_DATA_DIR/out/dataset_allround "d3cf2aeaa9320c772304cba42649eb47"
http://git-wip-us.apache.org/repos/asf/flink/blob/388a083c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 7f92a34..69b461b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -93,6 +93,11 @@ class SpillableSubpartition extends ResultSubpartition {
@Override
public synchronized boolean add(BufferConsumer bufferConsumer) throws IOException {
+ return add(bufferConsumer, false);
+ }
+
+ private boolean add(BufferConsumer bufferConsumer, boolean forceFinishRemainingBuffers)
+ throws IOException {
checkNotNull(bufferConsumer);
synchronized (buffers) {
@@ -109,7 +114,7 @@ class SpillableSubpartition extends ResultSubpartition {
increaseBuffersInBacklog(bufferConsumer);
if (spillWriter != null) {
- spillFinishedBufferConsumers();
+ spillFinishedBufferConsumers(forceFinishRemainingBuffers);
}
}
return true;
@@ -127,7 +132,7 @@ class SpillableSubpartition extends ResultSubpartition {
@Override
public synchronized void finish() throws IOException {
synchronized (buffers) {
- if (add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE))) {
+ if (add(EventSerializer.toBufferConsumer(EndOfPartitionEvent.INSTANCE), true)) {
isFinished = true;
}
@@ -228,7 +233,7 @@ class SpillableSubpartition extends ResultSubpartition {
spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel());
int numberOfBuffers = buffers.size();
- long spilledBytes = spillFinishedBufferConsumers();
+ long spilledBytes = spillFinishedBufferConsumers(isFinished);
int spilledBuffers = numberOfBuffers - buffers.size();
LOG.debug("Spilling {} bytes ({} buffers} for sub partition {} of {}.",
@@ -243,21 +248,39 @@ class SpillableSubpartition extends ResultSubpartition {
}
@VisibleForTesting
- protected long spillFinishedBufferConsumers() throws IOException {
+ long spillFinishedBufferConsumers(boolean forceFinishRemainingBuffers) throws IOException {
long spilledBytes = 0;
while (!buffers.isEmpty()) {
- BufferConsumer bufferConsumer = buffers.peek();
+ BufferConsumer bufferConsumer = buffers.getFirst();
Buffer buffer = bufferConsumer.build();
updateStatistics(buffer);
- spilledBytes += buffer.getSize();
- spillWriter.writeBlock(buffer);
-
- if (bufferConsumer.isFinished()) {
+ int bufferSize = buffer.getSize();
+ spilledBytes += bufferSize;
+
+ // NOTE we may be in the process of finishing the subpartition where any buffer should
+ // be treated as if it was finished!
+ if (bufferConsumer.isFinished() || forceFinishRemainingBuffers) {
+ if (bufferSize > 0) {
+ spillWriter.writeBlock(buffer);
+ } else {
+ // If we skip a buffer for the spill writer, we need to adapt the backlog accordingly
+ decreaseBuffersInBacklog(buffer);
+ buffer.recycleBuffer();
+ }
bufferConsumer.close();
buffers.poll();
- }
- else {
+ } else {
+ // If there is already data, we need to spill it anyway, since we do not get this
+ // slice from the buffer consumer again during the next build.
+ // BEWARE: by doing so, we increase the actual number of buffers in the spill writer!
+ if (bufferSize > 0) {
+ spillWriter.writeBlock(buffer);
+ increaseBuffersInBacklog(bufferConsumer);
+ } else {
+ buffer.recycleBuffer();
+ }
+
return spilledBytes;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/388a083c/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
index 7beb18f..9706a86 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferBuilderTestUtils.java
@@ -47,6 +47,10 @@ public class BufferBuilderTestUtils {
BufferBuilder bufferBuilder = new BufferBuilder(
MemorySegmentFactory.allocateUnpooledSegment(size),
FreeingBufferRecycler.INSTANCE);
+ return fillBufferBuilder(bufferBuilder, dataSize);
+ }
+
+ public static BufferBuilder fillBufferBuilder(BufferBuilder bufferBuilder, int dataSize) {
bufferBuilder.appendAndCommit(ByteBuffer.allocate(dataSize));
return bufferBuilder;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/388a083c/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index 09e0291..2e47379 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -42,7 +42,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -51,6 +51,7 @@ import java.util.concurrent.Future;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferConsumer;
+import static org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.fillBufferBuilder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@@ -257,6 +258,85 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
}
/**
+ * Tests that a spilled partition is correctly read back in via a spilled read view. The
+ * partition went into spilled state before adding buffers and the access pattern resembles
+ * the actual use of {@link org.apache.flink.runtime.io.network.api.writer.RecordWriter}.
+ */
+ @Test
+ public void testConsumeSpilledPartitionSpilledBeforeAdd() throws Exception {
+ SpillableSubpartition partition = createSubpartition();
+ assertEquals(0, partition.releaseMemory()); // <---- SPILL to disk
+
+ BufferBuilder[] bufferBuilders = new BufferBuilder[] {
+ createBufferBuilder(BUFFER_DATA_SIZE),
+ createBufferBuilder(BUFFER_DATA_SIZE),
+ createBufferBuilder(BUFFER_DATA_SIZE),
+ createBufferBuilder(BUFFER_DATA_SIZE)
+ };
+ BufferConsumer[] bufferConsumers = Arrays.stream(bufferBuilders).map(
+ BufferBuilder::createBufferConsumer
+ ).toArray(BufferConsumer[]::new);
+
+ BufferConsumer eventBufferConsumer =
+ EventSerializer.toBufferConsumer(new CancelCheckpointMarker(1));
+ final int eventSize = eventBufferConsumer.getWrittenBytes();
+
+ // note: only the newest buffer may be unfinished!
+ partition.add(bufferConsumers[0]);
+ fillBufferBuilder(bufferBuilders[0], BUFFER_DATA_SIZE).finish();
+ partition.add(bufferConsumers[1]);
+ fillBufferBuilder(bufferBuilders[1], BUFFER_DATA_SIZE).finish();
+ partition.add(eventBufferConsumer);
+ partition.add(bufferConsumers[2]);
+ bufferBuilders[2].finish(); // remains empty
+ partition.add(bufferConsumers[3]);
+ // last one: partially filled, unfinished
+ fillBufferBuilder(bufferBuilders[3], BUFFER_DATA_SIZE / 2);
+ // finished buffers only:
+ int expectedSize = BUFFER_DATA_SIZE * 2 + eventSize;
+
+ // now the bufferConsumer may be freed, depending on the timing of the write operation
+ // -> let's do this check at the end of the test (to save some time)
+ // still same statistics
+ assertEquals(5, partition.getTotalNumberOfBuffers());
+ assertEquals(3, partition.getBuffersInBacklog());
+ assertEquals(expectedSize, partition.getTotalNumberOfBytes());
+
+ partition.finish();
+ expectedSize += BUFFER_DATA_SIZE / 2; // previously unfinished buffer
+ expectedSize += 4; // + one EndOfPartitionEvent
+ assertEquals(6, partition.getTotalNumberOfBuffers());
+ assertEquals(3, partition.getBuffersInBacklog());
+ assertEquals(expectedSize, partition.getTotalNumberOfBytes());
+ Arrays.stream(bufferConsumers).forEach(bufferConsumer -> assertTrue(bufferConsumer.isRecycled()));
+
+ AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
+ SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(listener);
+
+ assertEquals(1, listener.getNumNotifications());
+
+ assertFalse(reader.nextBufferIsEvent()); // full buffer
+ assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 2, false, true);
+ assertEquals(2, partition.getBuffersInBacklog());
+
+ assertFalse(reader.nextBufferIsEvent()); // full buffer
+ assertNextBuffer(reader, BUFFER_DATA_SIZE, true, 1, true, true);
+ assertEquals(1, partition.getBuffersInBacklog());
+
+ assertTrue(reader.nextBufferIsEvent()); // event
+ assertNextEvent(reader, eventSize, CancelCheckpointMarker.class, true, 1, false, true);
+ assertEquals(1, partition.getBuffersInBacklog());
+
+ assertFalse(reader.nextBufferIsEvent()); // partial buffer
+ assertNextBuffer(reader, BUFFER_DATA_SIZE / 2, true, 0, true, true);
+ assertEquals(0, partition.getBuffersInBacklog());
+
+ assertTrue(reader.nextBufferIsEvent()); // end of partition event
+ assertNextEvent(reader, 4, EndOfPartitionEvent.class, false, 0, false, true);
+ assertEquals(0, partition.getBuffersInBacklog());
+ }
+
+ /**
* Tests that a spilled partition is correctly read back in via a spilled
* read view.
*/
@@ -668,19 +748,41 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
}
/**
- * Tests {@link SpillableSubpartition#spillFinishedBufferConsumers()} spilled bytes counting.
+ * Tests {@link SpillableSubpartition#spillFinishedBufferConsumers} spilled bytes and
+ * buffers counting.
*/
@Test
- public void testSpillFinishedBufferConsumers() throws Exception {
+ public void testSpillFinishedBufferConsumersFull() throws Exception {
SpillableSubpartition partition = createSubpartition();
BufferBuilder bufferBuilder = createBufferBuilder(BUFFER_DATA_SIZE);
partition.add(bufferBuilder.createBufferConsumer());
assertEquals(0, partition.releaseMemory());
+ assertEquals(1, partition.getBuffersInBacklog());
+ // finally fill the buffer with some bytes
+ fillBufferBuilder(bufferBuilder, BUFFER_DATA_SIZE).finish();
+ assertEquals(BUFFER_DATA_SIZE, partition.spillFinishedBufferConsumers(false));
+ assertEquals(1, partition.getBuffersInBacklog());
+ }
+
+ /**
+ * Tests {@link SpillableSubpartition#spillFinishedBufferConsumers} spilled bytes and
+ * buffers counting with partially filled buffers.
+ */
+ @Test
+ public void testSpillFinishedBufferConsumersPartial() throws Exception {
+ SpillableSubpartition partition = createSubpartition();
+ BufferBuilder bufferBuilder = createBufferBuilder(BUFFER_DATA_SIZE * 2);
+
+ partition.add(bufferBuilder.createBufferConsumer());
+ fillBufferBuilder(bufferBuilder, BUFFER_DATA_SIZE);
+
+ assertEquals(0, partition.releaseMemory());
+ assertEquals(2, partition.getBuffersInBacklog()); // partial one spilled, buffer consumer still enqueued
// finally fill the buffer with some bytes
- bufferBuilder.appendAndCommit(ByteBuffer.allocate(BUFFER_DATA_SIZE));
- bufferBuilder.finish(); // so that this buffer can be removed from the queue
- assertEquals(BUFFER_DATA_SIZE, partition.spillFinishedBufferConsumers());
+ fillBufferBuilder(bufferBuilder, BUFFER_DATA_SIZE).finish();
+ assertEquals(BUFFER_DATA_SIZE, partition.spillFinishedBufferConsumers(false));
+ assertEquals(2, partition.getBuffersInBacklog());
}
/**
[2/4] flink git commit: [hotfix][network] extend logging message in
SpillableSubpartition
Posted by sr...@apache.org.
[hotfix][network] extend logging message in SpillableSubpartition
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/75e5953b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/75e5953b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/75e5953b
Branch: refs/heads/master
Commit: 75e5953b91cf19f394e7db9ec5813f59203a7e32
Parents: 5ce3ff6
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Fri Apr 6 19:36:15 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Apr 19 10:56:12 2018 +0200
----------------------------------------------------------------------
.../runtime/io/network/partition/SpillableSubpartition.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/75e5953b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index 6b731d4..7f92a34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -229,10 +229,12 @@ class SpillableSubpartition extends ResultSubpartition {
int numberOfBuffers = buffers.size();
long spilledBytes = spillFinishedBufferConsumers();
+ int spilledBuffers = numberOfBuffers - buffers.size();
- LOG.debug("Spilling {} bytes for sub partition {} of {}.", spilledBytes, index, parent.getPartitionId());
+ LOG.debug("Spilling {} bytes ({} buffers} for sub partition {} of {}.",
+ spilledBytes, spilledBuffers, index, parent.getPartitionId());
- return numberOfBuffers - buffers.size();
+ return spilledBuffers;
}
}