You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/02 08:43:09 UTC
flink git commit: [FLINK-5169] [network] Fix spillable subpartition
buffer count
Repository: flink
Updated Branches:
refs/heads/release-1.1 9c058871f -> 2bf87228e
[FLINK-5169] [network] Fix spillable subpartition buffer count
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bf87228
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bf87228
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bf87228
Branch: refs/heads/release-1.1
Commit: 2bf87228e454383f5fe90b1bb36341181aa7b2f3
Parents: 9c05887
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Dec 1 18:38:30 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 21:41:55 2016 +0100
----------------------------------------------------------------------
.../netty/SequenceNumberingViewReader.java | 10 ++
.../partition/SpillableSubpartition.java | 5 +
.../partition/SpillableSubpartitionView.java | 22 +++-
.../partition/SpilledSubpartitionView.java | 13 +-
.../partition/SpillableSubpartitionTest.java | 130 +++++++++++++++++++
5 files changed, 178 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2bf87228/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
index ef611eb..5036bb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
@@ -127,4 +127,14 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener {
requestQueue.notifyReaderNonEmpty(this);
}
}
+
+ @Override
+ public String toString() {
+ return "SequenceNumberingViewReader{" +
+ "requestLock=" + requestLock +
+ ", receiverId=" + receiverId +
+ ", numBuffersAvailable=" + numBuffersAvailable.get() +
+ ", sequenceNumber=" + sequenceNumber +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2bf87228/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index b584ebb..efe6884 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -91,6 +91,11 @@ class SpillableSubpartition extends ResultSubpartition {
return false;
}
+ // The number of buffers are needed later when creating
+ // the read views. If you ever remove this line here,
+ // make sure to still count the number of buffers.
+ updateStatistics(buffer);
+
if (spillWriter == null) {
buffers.add(buffer);
http://git-wip-us.apache.org/repos/asf/flink/blob/2bf87228/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 8119ecc..533f95b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.io.network.partition;
import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayDeque;
@@ -30,6 +32,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
class SpillableSubpartitionView implements ResultSubpartitionView {
+ private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartitionView.class);
+
/** The subpartition this view belongs to. */
private final SpillableSubpartition parent;
@@ -51,6 +55,9 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
private final AtomicBoolean isReleased = new AtomicBoolean(false);
+ /** Remember the number of buffers this view was created with. */
+ private final long numBuffers;
+
/**
* The next buffer to hand out. Everytime this is set to a non-null value,
* a listener notification happens.
@@ -73,6 +80,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
this.listener = checkNotNull(listener);
synchronized (buffers) {
+ numBuffers = buffers.size();
nextBuffer = buffers.poll();
}
@@ -94,9 +102,12 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
// Create the spill writer and write all buffers to disk
BufferFileWriter spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel());
+ long spilledBytes = 0;
+
int numBuffers = buffers.size();
for (int i = 0; i < numBuffers; i++) {
Buffer buffer = buffers.remove();
+ spilledBytes += buffer.getSize();
try {
spillWriter.writeBlock(buffer);
} finally {
@@ -111,6 +122,11 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
numBuffers,
listener);
+ LOG.debug("Spilling {} bytes for sub partition {} of {}.",
+ spilledBytes,
+ parent.index,
+ parent.parent.getPartitionId());
+
return numBuffers;
}
}
@@ -188,8 +204,12 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
@Override
public String toString() {
- return String.format("SpillableSubpartitionView(index: %d) of ResultPartition %s",
+ boolean hasSpilled = spilledView != null;
+
+ return String.format("SpillableSubpartitionView(index: %d, buffers: %d, spilled? {}) of ResultPartition %s",
parent.index,
+ numBuffers,
+ hasSpilled,
parent.parent.getPartitionId());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/2bf87228/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
index b087a4e..7488132 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.util.event.NotificationListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayDeque;
@@ -46,6 +48,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
class SpilledSubpartitionView implements ResultSubpartitionView, NotificationListener {
+ private static final Logger LOG = LoggerFactory.getLogger(SpilledSubpartitionView.class);
+
/** The subpartition this view belongs to. */
private final ResultSubpartition parent;
@@ -91,6 +95,9 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis
if (!spillWriter.registerAllRequestsProcessedListener(this)) {
isSpillInProgress = false;
availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+ LOG.debug("No spilling in progress. Notified about {} available buffers.", numberOfSpilledBuffers);
+ } else {
+ LOG.debug("Spilling in progress. Waiting with notification about {} available buffers.", numberOfSpilledBuffers);
}
}
@@ -103,6 +110,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis
public void onNotification() {
isSpillInProgress = false;
availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+ LOG.debug("Finished spilling. Notified about {} available buffers.", numberOfSpilledBuffers);
}
@Override
@@ -158,7 +166,10 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis
@Override
public String toString() {
- return String.format("SpilledSubpartitionView[sync](index: %d) of ResultPartition %s", parent.index, parent.parent.getPartitionId());
+ return String.format("SpilledSubpartitionView(index: %d, buffers: {}) of ResultPartition %s",
+ parent.index,
+ numberOfSpilledBuffers,
+ parent.parent.getPartitionId());
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/2bf87228/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index b7a54d7..b53ef68 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -18,11 +18,15 @@
package org.apache.flink.runtime.io.network.partition;
+import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
import org.junit.AfterClass;
import org.junit.Test;
@@ -36,12 +40,16 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class SpillableSubpartitionTest extends SubpartitionTestBase {
@@ -153,4 +161,126 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
assertNull(readView.getNextBuffer());
}
+
+ /**
+ * Tests that a spilled partition is correctly read back in via a spilled
+ * read view.
+ */
+ @Test
+ public void testConsumeSpilledPartition() throws Exception {
+ ResultPartition parent = mock(ResultPartition.class);
+ SpillableSubpartition partition = new SpillableSubpartition(
+ 0,
+ parent,
+ ioManager);
+
+ Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
+ buffer.retain();
+ buffer.retain();
+
+ partition.add(buffer);
+ partition.add(buffer);
+ partition.add(buffer);
+
+ assertEquals(3, partition.releaseMemory());
+
+ partition.finish();
+
+ BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
+ SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(new TestInfiniteBufferProvider(), listener);
+
+ verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
+
+ Buffer read = reader.getNextBuffer();
+ assertNotNull(read);
+ read.recycle();
+
+ read = reader.getNextBuffer();
+ assertNotNull(read);
+ read.recycle();
+
+ read = reader.getNextBuffer();
+ assertNotNull(read);
+ read.recycle();
+
+ // End of partition
+ read = reader.getNextBuffer();
+ assertNotNull(read);
+ assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass());
+ read.recycle();
+ }
+
+ /**
+ * Tests that a spilled partition is correctly read back in via a spilled
+ * read view.
+ */
+ @Test
+ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception {
+ ResultPartition parent = mock(ResultPartition.class);
+ SpillableSubpartition partition = new SpillableSubpartition(
+ 0,
+ parent,
+ ioManager);
+
+ Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
+ buffer.retain();
+ buffer.retain();
+
+ partition.add(buffer);
+ partition.add(buffer);
+ partition.add(buffer);
+ partition.finish();
+
+ AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
+ SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(new TestInfiniteBufferProvider(), listener);
+
+ // Initial notification
+ assertEquals(1, listener.getNumNotifiedBuffers());
+
+ Buffer read = reader.getNextBuffer();
+ assertNotNull(read);
+ read.recycle();
+ assertEquals(2, listener.getNumNotifiedBuffers());
+
+ // Spill now
+ assertEquals(2, partition.releaseMemory());
+
+ listener.awaitNotifications(4, 30_000);
+ assertEquals(4, listener.getNumNotifiedBuffers());
+
+ read = reader.getNextBuffer();
+ assertNotNull(read);
+ read.recycle();
+
+ read = reader.getNextBuffer();
+ assertNotNull(read);
+ read.recycle();
+
+ // End of partition
+ read = reader.getNextBuffer();
+ assertNotNull(read);
+ assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass());
+ read.recycle();
+ }
+
+ private static class AwaitableBufferAvailablityListener implements BufferAvailabilityListener {
+
+ private long numNotifiedBuffers;
+
+ @Override
+ public void notifyBuffersAvailable(long numBuffers) {
+ numNotifiedBuffers += numBuffers;
+ }
+
+ long getNumNotifiedBuffers() {
+ return numNotifiedBuffers;
+ }
+
+ void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) throws InterruptedException {
+ long deadline = System.currentTimeMillis() + timeoutMillis;
+ while (numNotifiedBuffers < awaitedNumNotifiedBuffers && System.currentTimeMillis() < deadline) {
+ Thread.sleep(1);
+ }
+ }
+ }
}