You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/20 10:03:47 UTC

[1/3] flink git commit: [FLINK-1706] Resource cleanup added streaming readers with BarrierBuffers

Repository: flink
Updated Branches:
  refs/heads/master c0917f237 -> ceea93d41


[FLINK-1706] Resource cleanup added streaming readers with BarrierBuffers


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f625b8dd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f625b8dd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f625b8dd

Branch: refs/heads/master
Commit: f625b8ddadef2268e692905677e69a0adb1713ce
Parents: 650c528
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Mar 18 19:22:16 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Thu Mar 19 21:41:38 2015 +0100

----------------------------------------------------------------------
 .../api/streamvertex/CoStreamVertex.java        |  4 +-
 .../api/streamvertex/InputHandler.java          |  5 ++-
 .../api/streamvertex/StreamVertex.java          |  2 +-
 .../flink/streaming/io/BarrierBuffer.java       | 15 +++++++
 .../flink/streaming/io/BufferSpiller.java       | 10 ++++-
 .../flink/streaming/io/CoRecordReader.java      | 14 +++++--
 .../apache/flink/streaming/io/SpillReader.java  | 43 ++++++++++++--------
 .../io/StreamingAbstractRecordReader.java       |  8 +++-
 .../flink/streaming/io/StreamingReader.java     | 27 ++++++++++++
 .../flink/streaming/io/BarrierBufferTest.java   |  3 ++
 10 files changed, 104 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f625b8dd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index 6957652..d794a35 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.io.CoRecordReader;
 import org.apache.flink.streaming.io.IndexedReaderIterator;
 import org.apache.flink.util.MutableObjectIterator;
 
+import java.io.IOException;
 import java.util.ArrayList;
 
 public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
@@ -63,9 +64,10 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 	}
 
 	@Override
-	public void clearBuffers() {
+	public void clearBuffers() throws IOException {
 		outputHandler.clearWriters();
 		coReader.clearBuffers();
+		coReader.cleanup();
 	}
 
 	protected void setConfigInputs() throws StreamVertexException {

http://git-wip-us.apache.org/repos/asf/flink/blob/f625b8dd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index d766705..726df53 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.api.streamvertex;
 
+import java.io.IOException;
+
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
@@ -86,9 +88,10 @@ public class InputHandler<IN> {
 		return inputIter;
 	}
 
-	public void clearReaders() {
+	public void clearReaders() throws IOException {
 		if (inputs != null) {
 			inputs.clearBuffers();
 		}
+		inputs.cleanup();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f625b8dd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 926aac2..30b69a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -191,7 +191,7 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 
 	}
 
-	protected void clearBuffers() {
+	protected void clearBuffers() throws IOException {
 		if (outputHandler != null) {
 			outputHandler.clearWriters();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/f625b8dd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
index ea422e5..f349ac5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.io;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -225,4 +226,18 @@ public class BarrierBuffer {
 		blockChannel(bufferOrEvent.getChannelIndex());
 	}
 
+	public void cleanup() throws IOException {
+		bufferSpiller.close();
+		File spillfile1 = bufferSpiller.getSpillFile();
+		if (spillfile1 != null) {
+			spillfile1.delete();
+		}
+
+		spillReader.close();
+		File spillfile2 = spillReader.getSpillFile();
+		if (spillfile2 != null) {
+			spillfile2.delete();
+		}
+	}
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f625b8dd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
index e824430..b028ea7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
@@ -51,8 +51,14 @@ public class BufferSpiller {
 	 * Dumps the contents of the buffer to disk and recycles the buffer.
 	 */
 	public void spill(Buffer buffer) throws IOException {
-		spillingChannel.write(buffer.getNioBuffer());
-		buffer.recycle();
+		try {
+			spillingChannel.write(buffer.getNioBuffer());
+			buffer.recycle();
+		} catch (IOException e) {
+			close();
+			throw new IOException(e);
+		}
+
 	}
 
 	@SuppressWarnings("resource")

http://git-wip-us.apache.org/repos/asf/flink/blob/f625b8dd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index c32db4e..25cb25d 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
  */
 @SuppressWarnings("rawtypes")
 public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> extends
-		AbstractReader implements EventListener<InputGate> {
+		AbstractReader implements EventListener<InputGate>, StreamingReader {
 
 	private final InputGate bufferReader1;
 
@@ -232,8 +232,8 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 	public void onEvent(InputGate bufferReader) {
 		addToAvailable(bufferReader);
 	}
-	
-	protected void addToAvailable(InputGate bufferReader){
+
+	protected void addToAvailable(InputGate bufferReader) {
 		if (bufferReader == bufferReader1) {
 			availableRecordReaders.add(1);
 		} else if (bufferReader == bufferReader2) {
@@ -278,4 +278,12 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 
 	}
 
+	public void cleanup() throws IOException {
+		try {
+			barrierBuffer1.cleanup();
+		} finally {
+			barrierBuffer2.cleanup();
+		}
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f625b8dd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
index 3526bea..3cb83d4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
@@ -38,28 +38,33 @@ public class SpillReader {
 	 * 
 	 */
 	public Buffer readNextBuffer(int bufferSize, BufferPool bufferPool) throws IOException {
+		try {
+			Buffer buffer = null;
 
-		Buffer buffer = null;
+			// If available tries to request a new buffer from the pool
+			if (bufferPool != null) {
+				buffer = bufferPool.requestBuffer();
+			}
 
-		// If available tries to request a new buffer from the pool
-		if (bufferPool != null) {
-			buffer = bufferPool.requestBuffer();
-		}
-
-		// If no bufferpool provided or the pool was empty create a new buffer
-		if (buffer == null) {
-			buffer = new Buffer(new MemorySegment(new byte[bufferSize]), new BufferRecycler() {
+			// If no bufferpool provided or the pool was empty create a new
+			// buffer
+			if (buffer == null) {
+				buffer = new Buffer(new MemorySegment(new byte[bufferSize]), new BufferRecycler() {
 
-				@Override
-				public void recycle(MemorySegment memorySegment) {
-					memorySegment.free();
-				}
-			});
-		}
+					@Override
+					public void recycle(MemorySegment memorySegment) {
+						memorySegment.free();
+					}
+				});
+			}
 
-		spillingChannel.read(buffer.getMemorySegment().wrap(0, bufferSize));
+			spillingChannel.read(buffer.getMemorySegment().wrap(0, bufferSize));
 
-		return buffer;
+			return buffer;
+		} catch (Exception e) {
+			close();
+			throw new IOException(e);
+		}
 	}
 
 	@SuppressWarnings("resource")
@@ -73,6 +78,10 @@ public class SpillReader {
 		this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
 	}
 
+	public File getSpillFile() {
+		return spillFile;
+	}
+
 	public void close() throws IOException {
 		if (this.spillingChannel != null && this.spillingChannel.isOpen()) {
 			this.spillingChannel.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/f625b8dd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
index 8296010..c5ffa62 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingAbstractRecordReader.java
@@ -43,8 +43,8 @@ import org.slf4j.LoggerFactory;
  * @param <T>
  *            The type of the record that can be read with this record reader.
  */
-public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends AbstractReader implements
-		ReaderBase {
+public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable> extends
+		AbstractReader implements ReaderBase, StreamingReader {
 
 	@SuppressWarnings("unused")
 	private static final Logger LOG = LoggerFactory.getLogger(StreamingAbstractRecordReader.class);
@@ -122,4 +122,8 @@ public abstract class StreamingAbstractRecordReader<T extends IOReadableWritable
 			}
 		}
 	}
+
+	public void cleanup() throws IOException {
+		barrierBuffer.cleanup();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f625b8dd/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingReader.java
new file mode 100644
index 0000000..74b986a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/StreamingReader.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+
+public interface StreamingReader {
+
+	public void cleanup() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f625b8dd/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
index 2b8a218..d874902 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
@@ -54,6 +54,7 @@ public class BarrierBufferTest {
 		assertEquals(input.get(1), bb.getNextNonBlocked());
 		assertEquals(input.get(2), bb.getNextNonBlocked());
 
+		bb.cleanup();
 	}
 
 	@Test
@@ -84,6 +85,7 @@ public class BarrierBufferTest {
 		bb.processSuperstep(nextBoe);
 		assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
 
+		bb.cleanup();
 	}
 
 	@Test
@@ -130,6 +132,7 @@ public class BarrierBufferTest {
 		check(input.get(6), nextBoe = bb.getNextNonBlocked());
 		check(input.get(9), nextBoe = bb.getNextNonBlocked());
 
+		bb.cleanup();
 	}
 
 	private static void check(BufferOrEvent expected, BufferOrEvent actual) {


[2/3] flink git commit: [FLINK-1706] Spilling BarrierBuffer added + basic tests

Posted by gy...@apache.org.
[FLINK-1706] Spilling BarrierBuffer added + basic tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/650c528f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/650c528f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/650c528f

Branch: refs/heads/master
Commit: 650c528fc4452e6b11294062699001a5b4a21ac9
Parents: c0917f2
Author: Gyula Fora <gy...@apache.org>
Authored: Tue Mar 17 17:39:27 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Thu Mar 19 21:41:38 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/io/network/buffer/Buffer.java |  4 +
 .../runtime/io/network/buffer/BufferTest.java   |  1 +
 .../api/streamvertex/StreamingSuperstep.java    |  8 ++
 .../flink/streaming/io/BarrierBuffer.java       | 54 +++++++----
 .../flink/streaming/io/BufferSpiller.java       | 85 ++++++++++++++++++
 .../apache/flink/streaming/io/SpillReader.java  | 82 +++++++++++++++++
 .../streaming/io/SpillingBufferOrEvent.java     | 94 ++++++++++++++++++++
 .../flink/streaming/io/BarrierBufferTest.java   | 34 ++++---
 .../streaming/io/SpillingBufferOrEventTest.java | 94 ++++++++++++++++++++
 9 files changed, 424 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index 2ed82fa..2642521 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -89,6 +89,10 @@ public class Buffer {
 			return memorySegment.wrap(0, currentSize).duplicate();
 		}
 	}
+	
+	public BufferRecycler getRecycler(){
+		return recycler;
+	}
 
 	public int getSize() {
 		synchronized (recycleLock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
index 2630608..f2f9c09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferTest.java
@@ -50,4 +50,5 @@ public class BufferTest {
 			// OK => expected exception
 		}
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
index 557c636..d46ca79 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamingSuperstep.java
@@ -49,4 +49,12 @@ public class StreamingSuperstep extends TaskEvent {
 	public long getId() {
 		return id;
 	}
+
+	public boolean equals(Object other) {
+		if (other == null || !(other instanceof StreamingSuperstep)) {
+			return false;
+		} else {
+			return ((StreamingSuperstep) other).id == this.id;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
index 42d4919..ea422e5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BarrierBuffer.java
@@ -43,8 +43,8 @@ public class BarrierBuffer {
 
 	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
 
-	private Queue<BufferOrEvent> nonprocessed = new LinkedList<BufferOrEvent>();
-	private Queue<BufferOrEvent> blockedNonprocessed = new LinkedList<BufferOrEvent>();
+	private Queue<SpillingBufferOrEvent> nonprocessed = new LinkedList<SpillingBufferOrEvent>();
+	private Queue<SpillingBufferOrEvent> blockedNonprocessed = new LinkedList<SpillingBufferOrEvent>();
 
 	private Set<Integer> blockedChannels = new HashSet<Integer>();
 	private int totalNumberOfInputChannels;
@@ -56,10 +56,20 @@ public class BarrierBuffer {
 
 	private InputGate inputGate;
 
+	private SpillReader spillReader;
+	private BufferSpiller bufferSpiller;
+
 	public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
 		this.inputGate = inputGate;
 		totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
 		this.reader = reader;
+		try {
+			this.bufferSpiller = new BufferSpiller();
+			this.spillReader = new SpillReader();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+
 	}
 
 	/**
@@ -77,28 +87,23 @@ public class BarrierBuffer {
 	}
 
 	/**
-	 * Buffers a bufferOrEvent received from a blocked channel
-	 * 
-	 * @param bufferOrEvent
-	 *            bufferOrEvent to buffer
-	 */
-	protected void store(BufferOrEvent bufferOrEvent) {
-		nonprocessed.add(bufferOrEvent);
-	}
-
-	/**
 	 * Get then next non-blocked non-processed BufferOrEvent. Returns null if
 	 * not available.
+	 * 
+	 * @throws IOException
 	 */
-	protected BufferOrEvent getNonProcessed() {
-		BufferOrEvent nextNonprocessed;
+	protected BufferOrEvent getNonProcessed() throws IOException {
+		SpillingBufferOrEvent nextNonprocessed;
+
 		while ((nextNonprocessed = nonprocessed.poll()) != null) {
-			if (isBlocked(nextNonprocessed.getChannelIndex())) {
-				blockedNonprocessed.add(nextNonprocessed);
+			BufferOrEvent boe = nextNonprocessed.getBufferOrEvent();
+			if (isBlocked(boe.getChannelIndex())) {
+				blockedNonprocessed.add(new SpillingBufferOrEvent(boe, bufferSpiller, spillReader));
 			} else {
-				return nextNonprocessed;
+				return boe;
 			}
 		}
+
 		return null;
 	}
 
@@ -137,7 +142,8 @@ public class BarrierBuffer {
 				bufferOrEvent = inputGate.getNextBufferOrEvent();
 				if (isBlocked(bufferOrEvent.getChannelIndex())) {
 					// If channel blocked we just store it
-					blockedNonprocessed.add(bufferOrEvent);
+					blockedNonprocessed.add(new SpillingBufferOrEvent(bufferOrEvent, bufferSpiller,
+							spillReader));
 				} else {
 					return bufferOrEvent;
 				}
@@ -168,6 +174,8 @@ public class BarrierBuffer {
 
 	/**
 	 * Releases the blocks on all channels.
+	 * 
+	 * @throws IOException
 	 */
 	protected void releaseBlocks() {
 		if (!nonprocessed.isEmpty()) {
@@ -175,7 +183,15 @@ public class BarrierBuffer {
 			throw new RuntimeException("Error in barrier buffer logic");
 		}
 		nonprocessed = blockedNonprocessed;
-		blockedNonprocessed = new LinkedList<BufferOrEvent>();
+		blockedNonprocessed = new LinkedList<SpillingBufferOrEvent>();
+
+		try {
+			spillReader.setSpillFile(bufferSpiller.getSpillFile());
+			bufferSpiller.resetSpillFile();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+
 		blockedChannels.clear();
 		superstepStarted = false;
 		if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
new file mode 100644
index 0000000..e824430
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/BufferSpiller.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Random;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.util.StringUtils;
+
+public class BufferSpiller {
+
+	protected static Random rnd = new Random();
+
+	private File spillFile;
+	protected FileChannel spillingChannel;
+	private String tempDir;
+
+	public BufferSpiller() throws IOException {
+		String tempDirString = GlobalConfiguration.getString(
+				ConfigConstants.TASK_MANAGER_TMP_DIR_KEY,
+				ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
+		String[] tempDirs = tempDirString.split(",|" + File.pathSeparator);
+
+		tempDir = tempDirs[rnd.nextInt(tempDirs.length)];
+
+		createSpillingChannel();
+	}
+
+	/**
+	 * Dumps the contents of the buffer to disk and recycles the buffer.
+	 */
+	public void spill(Buffer buffer) throws IOException {
+		spillingChannel.write(buffer.getNioBuffer());
+		buffer.recycle();
+	}
+
+	@SuppressWarnings("resource")
+	private void createSpillingChannel() throws IOException {
+		this.spillFile = new File(tempDir, randomString(rnd) + ".buffer");
+		this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
+	}
+
+	private static String randomString(Random random) {
+		final byte[] bytes = new byte[20];
+		random.nextBytes(bytes);
+		return StringUtils.byteToHexString(bytes);
+	}
+
+	public void close() throws IOException {
+		if (spillingChannel != null && spillingChannel.isOpen()) {
+			spillingChannel.close();
+		}
+	}
+
+	public void resetSpillFile() throws IOException {
+		close();
+		createSpillingChannel();
+	}
+
+	public File getSpillFile() {
+		return spillFile;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
new file mode 100644
index 0000000..3526bea
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillReader.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+
+public class SpillReader {
+
+	private FileChannel spillingChannel;
+	private File spillFile;
+
+	/**
+	 * Reads the next buffer from the spilled file. If a buffer pool was given,
+	 * uses the buffer pool to request a new buffer to read into.
+	 * 
+	 */
+	public Buffer readNextBuffer(int bufferSize, BufferPool bufferPool) throws IOException {
+
+		Buffer buffer = null;
+
+		// If available tries to request a new buffer from the pool
+		if (bufferPool != null) {
+			buffer = bufferPool.requestBuffer();
+		}
+
+		// If no bufferpool provided or the pool was empty create a new buffer
+		if (buffer == null) {
+			buffer = new Buffer(new MemorySegment(new byte[bufferSize]), new BufferRecycler() {
+
+				@Override
+				public void recycle(MemorySegment memorySegment) {
+					memorySegment.free();
+				}
+			});
+		}
+
+		spillingChannel.read(buffer.getMemorySegment().wrap(0, bufferSize));
+
+		return buffer;
+	}
+
+	@SuppressWarnings("resource")
+	public void setSpillFile(File nextSpillFile) throws IOException {
+		// We can close and delete the file now
+		close();
+		if (spillFile != null) {
+			spillFile.delete();
+		}
+		this.spillFile = nextSpillFile;
+		this.spillingChannel = new RandomAccessFile(spillFile, "rw").getChannel();
+	}
+
+	public void close() throws IOException {
+		if (this.spillingChannel != null && this.spillingChannel.isOpen()) {
+			this.spillingChannel.close();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
new file mode 100644
index 0000000..40713e2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/SpillingBufferOrEvent.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import java.io.IOException;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+public class SpillingBufferOrEvent {
+
+	private BufferOrEvent boe;
+	private boolean isSpilled = false;
+
+	private SpillReader spillReader;
+
+	private BufferPool bufferPool;
+
+	private int channelIndex;
+	private int bufferSize;
+
+	public SpillingBufferOrEvent(BufferOrEvent boe, BufferSpiller spiller, SpillReader reader)
+			throws IOException {
+
+		this.boe = boe;
+		this.spillReader = reader;
+
+		if (shouldSpill()) {
+			spiller.spill(boe.getBuffer());
+			isSpilled = true;
+			boe = null;
+		}
+	}
+
+	/**
+	 * If the buffer wasn't spilled simply returns the instance from the field,
+	 * otherwise reads it from the spill reader
+	 */
+	public BufferOrEvent getBufferOrEvent() throws IOException {
+		if (isSpilled) {
+			return new BufferOrEvent(spillReader.readNextBuffer(bufferSize, bufferPool),
+					channelIndex);
+		} else {
+			return boe;
+		}
+	}
+
+	/**
+	 * Checks whether a given buffer should be spilled to disk. Currently it
+	 * checks whether the buffer pool from which the buffer was supplied is
+	 * empty and only spills if it is. This avoids out of memory exceptions and
+	 * also blocks at the input gate.
+	 */
+	private boolean shouldSpill() throws IOException {
+		if (boe.isBuffer()) {
+			Buffer buffer = boe.getBuffer();
+			this.bufferSize = buffer.getSize();
+			BufferRecycler recycler = buffer.getRecycler();
+
+			if (recycler instanceof BufferPool) {
+				bufferPool = (BufferPool) recycler;
+				Buffer nextBuffer = bufferPool.requestBuffer();
+				if (nextBuffer == null) {
+					return true;
+				} else {
+					nextBuffer.recycle();
+				}
+			}
+		}
+
+		return false;
+	}
+
+	public boolean isSpilled() {
+		return isSpilled;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
index 1b4cc36..2b8a218 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
@@ -110,28 +110,36 @@ public class BarrierBufferTest {
 		BarrierBuffer bb = new BarrierBuffer(mockIG1, mockAR1);
 		BufferOrEvent nextBoe;
 
-		assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
-		assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
-		assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
+		check(input.get(0), nextBoe = bb.getNextNonBlocked());
+		check(input.get(1), nextBoe = bb.getNextNonBlocked());
+		check(input.get(2), nextBoe = bb.getNextNonBlocked());
 		bb.processSuperstep(nextBoe);
-		assertEquals(input.get(7), nextBoe = bb.getNextNonBlocked());
-		assertEquals(input.get(8), nextBoe = bb.getNextNonBlocked());
+		check(input.get(7), nextBoe = bb.getNextNonBlocked());
+		check(input.get(8), nextBoe = bb.getNextNonBlocked());
 		bb.processSuperstep(nextBoe);
-		assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
+		check(input.get(3), nextBoe = bb.getNextNonBlocked());
 		bb.processSuperstep(nextBoe);
-		assertEquals(input.get(10), nextBoe = bb.getNextNonBlocked());
-		assertEquals(input.get(11), nextBoe = bb.getNextNonBlocked());
+		check(input.get(10), nextBoe = bb.getNextNonBlocked());
+		check(input.get(11), nextBoe = bb.getNextNonBlocked());
 		bb.processSuperstep(nextBoe);
-		assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
-		assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
+		check(input.get(4), nextBoe = bb.getNextNonBlocked());
+		check(input.get(5), nextBoe = bb.getNextNonBlocked());
 		bb.processSuperstep(nextBoe);
-		assertEquals(input.get(12), nextBoe = bb.getNextNonBlocked());
+		check(input.get(12), nextBoe = bb.getNextNonBlocked());
 		bb.processSuperstep(nextBoe);
-		assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
-		assertEquals(input.get(9), nextBoe = bb.getNextNonBlocked());
+		check(input.get(6), nextBoe = bb.getNextNonBlocked());
+		check(input.get(9), nextBoe = bb.getNextNonBlocked());
 
 	}
 
+	private static void check(BufferOrEvent expected, BufferOrEvent actual) {
+		assertEquals(expected.isBuffer(), actual.isBuffer());
+		assertEquals(expected.getChannelIndex(), actual.getChannelIndex());
+		if (expected.isEvent()) {
+			assertEquals(expected.getEvent(), actual.getEvent());
+		}
+	}
+
 	protected static class MockInputGate implements InputGate {
 
 		private int numChannels;

http://git-wip-us.apache.org/repos/asf/flink/blob/650c528f/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java
new file mode 100644
index 0000000..2f28f90
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.junit.Test;
+
+public class SpillingBufferOrEventTest {
+
+	@Test
+	public void testSpilling() throws IOException, InterruptedException {
+		BufferSpiller bsp = new BufferSpiller();
+		SpillReader spr = new SpillReader();
+
+		BufferPool pool1 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
+		BufferPool pool2 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
+
+		Buffer b1 = pool1.requestBuffer();
+		b1.getMemorySegment().putInt(0, 10000);
+		BufferOrEvent boe1 = new BufferOrEvent(b1, 0);
+		SpillingBufferOrEvent sboe1 = new SpillingBufferOrEvent(boe1, bsp, spr);
+
+		assertFalse(sboe1.isSpilled());
+
+		Buffer b2 = pool2.requestBuffer();
+		b2.getMemorySegment().putInt(0, 10000);
+		BufferOrEvent boe2 = new BufferOrEvent(b2, 0);
+		SpillingBufferOrEvent sboe2 = new SpillingBufferOrEvent(boe2, bsp, spr);
+
+		assertFalse(sboe2.isSpilled());
+
+		Buffer b3 = pool1.requestBuffer();
+		b3.getMemorySegment().putInt(0, 50000);
+		BufferOrEvent boe3 = new BufferOrEvent(b3, 0);
+		SpillingBufferOrEvent sboe3 = new SpillingBufferOrEvent(boe3, bsp, spr);
+
+		assertTrue(sboe3.isSpilled());
+
+		Buffer b4 = pool2.requestBuffer();
+		b4.getMemorySegment().putInt(0, 60000);
+		BufferOrEvent boe4 = new BufferOrEvent(b4, 0);
+		SpillingBufferOrEvent sboe4 = new SpillingBufferOrEvent(boe4, bsp, spr);
+
+		assertTrue(sboe4.isSpilled());
+
+		bsp.close();
+
+		spr.setSpillFile(bsp.getSpillFile());
+
+		Buffer b1ret = sboe1.getBufferOrEvent().getBuffer();
+		assertEquals(10000, b1ret.getMemorySegment().getInt(0));
+		b1ret.recycle();
+
+		Buffer b2ret = sboe2.getBufferOrEvent().getBuffer();
+		assertEquals(10000, b2ret.getMemorySegment().getInt(0));
+		b2ret.recycle();
+
+		Buffer b3ret = sboe3.getBufferOrEvent().getBuffer();
+		assertEquals(50000, b3ret.getMemorySegment().getInt(0));
+		b3ret.recycle();
+
+		Buffer b4ret = sboe4.getBufferOrEvent().getBuffer();
+		assertEquals(60000, b4ret.getMemorySegment().getInt(0));
+		b4ret.recycle();
+
+		spr.close();
+		bsp.getSpillFile().delete();
+
+	}
+}


[3/3] flink git commit: [FLINK-1706] IOTest added for BarrierBuffers

Posted by gy...@apache.org.
[FLINK-1706] IOTest added for BarrierBuffers

Closes #493


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ceea93d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ceea93d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ceea93d4

Branch: refs/heads/master
Commit: ceea93d41b6e8ae66cd96d03f2f27e0027f8ac4c
Parents: f625b8d
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Mar 18 23:41:08 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Fri Mar 20 09:21:11 2015 +0100

----------------------------------------------------------------------
 .../api/streamvertex/InputHandler.java          |   2 +-
 .../api/streamvertex/StreamVertex.java          |   5 +-
 .../flink/streaming/io/BarrierBufferIOTest.java | 159 +++++++++++++++++++
 3 files changed, 162 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ceea93d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index 726df53..48a00d8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -91,7 +91,7 @@ public class InputHandler<IN> {
 	public void clearReaders() throws IOException {
 		if (inputs != null) {
 			inputs.clearBuffers();
+			inputs.cleanup();
 		}
-		inputs.cleanup();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ceea93d4/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 30b69a7..dd8a463 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -114,9 +114,8 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTa
 		if (configuration.getStateMonitoring() && !states.isEmpty()) {
 			getEnvironment().getJobManager().tell(
 					new StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
-							.getJobVertexId(), context.getIndexOfThisSubtask(), barrierID, 
-							new LocalStateHandle(states)),
-					ActorRef.noSender());
+							.getJobVertexId(), context.getIndexOfThisSubtask(), barrierID,
+							new LocalStateHandle(states)), ActorRef.noSender());
 		} else {
 			getEnvironment().getJobManager().tell(
 					new BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),

http://git-wip-us.apache.org/repos/asf/flink/blob/ceea93d4/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
new file mode 100644
index 0000000..24106c1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.io;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.junit.Test;
+
+public class BarrierBufferIOTest {
+
+	@Test
+	public void IOTest() throws IOException, InterruptedException {
+
+		BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+		BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+
+		MockInputGate myIG = new MockInputGate(new BufferPool[] { pool1, pool2 },
+				new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
+		// new BarrierSimulator[] { new CountBarrier(1000), new
+		// CountBarrier(1000) });
+
+		BarrierBuffer barrierBuffer = new BarrierBuffer(myIG,
+				new BarrierBufferTest.MockReader(myIG));
+
+		try {
+			// long time = System.currentTimeMillis();
+			for (int i = 0; i < 2000000; i++) {
+				BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
+				if (boe.isBuffer()) {
+					boe.getBuffer().recycle();
+				} else {
+					barrierBuffer.processSuperstep(boe);
+				}
+			}
+			// System.out.println("Ran for " + (System.currentTimeMillis() -
+			// time));
+		} catch (Exception e) {
+			fail();
+		} finally {
+			barrierBuffer.cleanup();
+		}
+	}
+
+	private static class RandomBarrier implements BarrierGenerator {
+		private static Random rnd = new Random();
+
+		double threshold;
+
+		public RandomBarrier(double expectedEvery) {
+			threshold = 1 / expectedEvery;
+		}
+
+		@Override
+		public boolean isNextBarrier() {
+			return rnd.nextDouble() < threshold;
+		}
+	}
+
+	private static class CountBarrier implements BarrierGenerator {
+
+		long every;
+		long c = 0;
+
+		public CountBarrier(long every) {
+			this.every = every;
+		}
+
+		@Override
+		public boolean isNextBarrier() {
+			return c++ % every == 0;
+		}
+	}
+
+	protected static class MockInputGate implements InputGate {
+
+		private int numChannels;
+		private BufferPool[] bufferPools;
+		private int[] currentSupersteps;
+		BarrierGenerator[] barrierGens;
+		int currentChannel = 0;
+		long c = 0;
+
+		public MockInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
+			this.numChannels = bufferPools.length;
+			this.currentSupersteps = new int[numChannels];
+			this.bufferPools = bufferPools;
+			this.barrierGens = barrierGens;
+		}
+
+		@Override
+		public int getNumberOfInputChannels() {
+			return numChannels;
+		}
+
+		@Override
+		public boolean isFinished() {
+			return false;
+		}
+
+		@Override
+		public void requestPartitions() throws IOException, InterruptedException {
+		}
+
+		@Override
+		public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+			currentChannel = (currentChannel + 1) % numChannels;
+
+			if (barrierGens[currentChannel].isNextBarrier()) {
+				return BarrierBufferTest.createSuperstep(++currentSupersteps[currentChannel],
+						currentChannel);
+			} else {
+				Buffer buffer = bufferPools[currentChannel].requestBuffer();
+				buffer.getMemorySegment().putLong(0, c++);
+
+				return new BufferOrEvent(buffer, currentChannel);
+			}
+
+		}
+
+		@Override
+		public void sendTaskEvent(TaskEvent event) throws IOException {
+		}
+
+		@Override
+		public void registerListener(EventListener<InputGate> listener) {
+		}
+
+	}
+
+	protected interface BarrierGenerator {
+		public boolean isNextBarrier();
+	}
+
+}