You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/12/02 08:43:09 UTC

flink git commit: [FLINK-5169] [network] Fix spillable subpartition buffer count

Repository: flink
Updated Branches:
  refs/heads/release-1.1 9c058871f -> 2bf87228e


[FLINK-5169] [network] Fix spillable subpartition buffer count


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2bf87228
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2bf87228
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2bf87228

Branch: refs/heads/release-1.1
Commit: 2bf87228e454383f5fe90b1bb36341181aa7b2f3
Parents: 9c05887
Author: Ufuk Celebi <uc...@apache.org>
Authored: Thu Dec 1 18:38:30 2016 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Dec 1 21:41:55 2016 +0100

----------------------------------------------------------------------
 .../netty/SequenceNumberingViewReader.java      |  10 ++
 .../partition/SpillableSubpartition.java        |   5 +
 .../partition/SpillableSubpartitionView.java    |  22 +++-
 .../partition/SpilledSubpartitionView.java      |  13 +-
 .../partition/SpillableSubpartitionTest.java    | 130 +++++++++++++++++++
 5 files changed, 178 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2bf87228/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
index ef611eb..5036bb7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
@@ -127,4 +127,14 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener {
 			requestQueue.notifyReaderNonEmpty(this);
 		}
 	}
+
+	@Override
+	public String toString() {
+		return "SequenceNumberingViewReader{" +
+			"requestLock=" + requestLock +
+			", receiverId=" + receiverId +
+			", numBuffersAvailable=" + numBuffersAvailable.get() +
+			", sequenceNumber=" + sequenceNumber +
+			'}';
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2bf87228/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
index b584ebb..efe6884 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java
@@ -91,6 +91,11 @@ class SpillableSubpartition extends ResultSubpartition {
 				return false;
 			}
 
+			// The number of buffers are needed later when creating
+			// the read views. If you ever remove this line here,
+			// make sure to still count the number of buffers.
+			updateStatistics(buffer);
+
 			if (spillWriter == null) {
 				buffers.add(buffer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2bf87228/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
index 8119ecc..533f95b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -30,6 +32,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 class SpillableSubpartitionView implements ResultSubpartitionView {
 
+	private static final Logger LOG = LoggerFactory.getLogger(SpillableSubpartitionView.class);
+
 	/** The subpartition this view belongs to. */
 	private final SpillableSubpartition parent;
 
@@ -51,6 +55,9 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 
 	private final AtomicBoolean isReleased = new AtomicBoolean(false);
 
+	/** Remember the number of buffers this view was created with. */
+	private final long numBuffers;
+
 	/**
 	 * The next buffer to hand out. Everytime this is set to a non-null value,
 	 * a listener notification happens.
@@ -73,6 +80,7 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 		this.listener = checkNotNull(listener);
 
 		synchronized (buffers) {
+			numBuffers = buffers.size();
 			nextBuffer = buffers.poll();
 		}
 
@@ -94,9 +102,12 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 				// Create the spill writer and write all buffers to disk
 				BufferFileWriter spillWriter = ioManager.createBufferFileWriter(ioManager.createChannel());
 
+				long spilledBytes = 0;
+
 				int numBuffers = buffers.size();
 				for (int i = 0; i < numBuffers; i++) {
 					Buffer buffer = buffers.remove();
+					spilledBytes += buffer.getSize();
 					try {
 						spillWriter.writeBlock(buffer);
 					} finally {
@@ -111,6 +122,11 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 					numBuffers,
 					listener);
 
+				LOG.debug("Spilling {} bytes for sub partition {} of {}.",
+					spilledBytes,
+					parent.index,
+					parent.parent.getPartitionId());
+
 				return numBuffers;
 			}
 		}
@@ -188,8 +204,12 @@ class SpillableSubpartitionView implements ResultSubpartitionView {
 
 	@Override
 	public String toString() {
-		return String.format("SpillableSubpartitionView(index: %d) of ResultPartition %s",
+		boolean hasSpilled = spilledView != null;
+
+		return String.format("SpillableSubpartitionView(index: %d, buffers: %d, spilled? {}) of ResultPartition %s",
 			parent.index,
+			numBuffers,
+			hasSpilled,
 			parent.parent.getPartitionId());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2bf87228/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
index b087a4e..7488132 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.io.disk.iomanager.SynchronousBufferFileReader;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.util.event.NotificationListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
@@ -46,6 +48,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 class SpilledSubpartitionView implements ResultSubpartitionView, NotificationListener {
 
+	private static final Logger LOG = LoggerFactory.getLogger(SpilledSubpartitionView.class);
+
 	/** The subpartition this view belongs to. */
 	private final ResultSubpartition parent;
 
@@ -91,6 +95,9 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis
 		if (!spillWriter.registerAllRequestsProcessedListener(this)) {
 			isSpillInProgress = false;
 			availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+			LOG.debug("No spilling in progress. Notified about {} available buffers.", numberOfSpilledBuffers);
+		} else {
+			LOG.debug("Spilling in progress. Waiting with notification about {} available buffers.", numberOfSpilledBuffers);
 		}
 	}
 
@@ -103,6 +110,7 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis
 	public void onNotification() {
 		isSpillInProgress = false;
 		availabilityListener.notifyBuffersAvailable(numberOfSpilledBuffers);
+		LOG.debug("Finished spilling. Notified about {} available buffers.", numberOfSpilledBuffers);
 	}
 
 	@Override
@@ -158,7 +166,10 @@ class SpilledSubpartitionView implements ResultSubpartitionView, NotificationLis
 
 	@Override
 	public String toString() {
-		return String.format("SpilledSubpartitionView[sync](index: %d) of ResultPartition %s", parent.index, parent.parent.getPartitionId());
+		return String.format("SpilledSubpartitionView(index: %d, buffers: {}) of ResultPartition %s",
+			parent.index,
+			numberOfSpilledBuffers,
+			parent.parent.getPartitionId());
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/2bf87228/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
index b7a54d7..b53ef68 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.util.TestInfiniteBufferProvider;
 import org.junit.AfterClass;
 import org.junit.Test;
@@ -36,12 +40,16 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class SpillableSubpartitionTest extends SubpartitionTestBase {
@@ -153,4 +161,126 @@ public class SpillableSubpartitionTest extends SubpartitionTestBase {
 
 		assertNull(readView.getNextBuffer());
 	}
+
+	/**
+	 * Tests that a spilled partition is correctly read back in via a spilled
+	 * read view.
+	 */
+	@Test
+	public void testConsumeSpilledPartition() throws Exception {
+		ResultPartition parent = mock(ResultPartition.class);
+		SpillableSubpartition partition = new SpillableSubpartition(
+			0,
+			parent,
+			ioManager);
+
+		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
+		buffer.retain();
+		buffer.retain();
+
+		partition.add(buffer);
+		partition.add(buffer);
+		partition.add(buffer);
+
+		assertEquals(3, partition.releaseMemory());
+
+		partition.finish();
+
+		BufferAvailabilityListener listener = mock(BufferAvailabilityListener.class);
+		SpilledSubpartitionView reader = (SpilledSubpartitionView) partition.createReadView(new TestInfiniteBufferProvider(), listener);
+
+		verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
+
+		Buffer read = reader.getNextBuffer();
+		assertNotNull(read);
+		read.recycle();
+
+		read = reader.getNextBuffer();
+		assertNotNull(read);
+		read.recycle();
+
+		read = reader.getNextBuffer();
+		assertNotNull(read);
+		read.recycle();
+
+		// End of partition
+		read = reader.getNextBuffer();
+		assertNotNull(read);
+		assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass());
+		read.recycle();
+	}
+
+	/**
+	 * Tests that a spilled partition is correctly read back in via a spilled
+	 * read view.
+	 */
+	@Test
+	public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception {
+		ResultPartition parent = mock(ResultPartition.class);
+		SpillableSubpartition partition = new SpillableSubpartition(
+			0,
+			parent,
+			ioManager);
+
+		Buffer buffer = new Buffer(MemorySegmentFactory.allocateUnpooledSegment(4096), FreeingBufferRecycler.INSTANCE);
+		buffer.retain();
+		buffer.retain();
+
+		partition.add(buffer);
+		partition.add(buffer);
+		partition.add(buffer);
+		partition.finish();
+
+		AwaitableBufferAvailablityListener listener = new AwaitableBufferAvailablityListener();
+		SpillableSubpartitionView reader = (SpillableSubpartitionView) partition.createReadView(new TestInfiniteBufferProvider(), listener);
+
+		// Initial notification
+		assertEquals(1, listener.getNumNotifiedBuffers());
+
+		Buffer read = reader.getNextBuffer();
+		assertNotNull(read);
+		read.recycle();
+		assertEquals(2, listener.getNumNotifiedBuffers());
+
+		// Spill now
+		assertEquals(2, partition.releaseMemory());
+
+		listener.awaitNotifications(4, 30_000);
+		assertEquals(4, listener.getNumNotifiedBuffers());
+
+		read = reader.getNextBuffer();
+		assertNotNull(read);
+		read.recycle();
+
+		read = reader.getNextBuffer();
+		assertNotNull(read);
+		read.recycle();
+
+		// End of partition
+		read = reader.getNextBuffer();
+		assertNotNull(read);
+		assertEquals(EndOfPartitionEvent.class, EventSerializer.fromBuffer(read, ClassLoader.getSystemClassLoader()).getClass());
+		read.recycle();
+	}
+
+	private static class AwaitableBufferAvailablityListener implements BufferAvailabilityListener {
+
+		private long numNotifiedBuffers;
+
+		@Override
+		public void notifyBuffersAvailable(long numBuffers) {
+			numNotifiedBuffers += numBuffers;
+		}
+
+		long getNumNotifiedBuffers() {
+			return numNotifiedBuffers;
+		}
+
+		void awaitNotifications(long awaitedNumNotifiedBuffers, long timeoutMillis) throws InterruptedException {
+			long deadline = System.currentTimeMillis() + timeoutMillis;
+			while (numNotifiedBuffers < awaitedNumNotifiedBuffers && System.currentTimeMillis() < deadline) {
+				Thread.sleep(1);
+			}
+		}
+	}
 }