You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:40 UTC

[24/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
deleted file mode 100644
index d4fdc59..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ /dev/null
@@ -1,954 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.HeapMemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the behavior of the {@link BarrierBuffer}.
- */
-public class BarrierBufferTest {
-
-	private static final int PAGE_SIZE = 512;
-	
-	private static int SIZE_COUNTER = 0;
-	
-	private static IOManager IO_MANAGER;
-
-	@BeforeClass
-	public static void setup() {
-		IO_MANAGER = new IOManagerAsync();
-		SIZE_COUNTER = 1;
-	}
-
-	@AfterClass
-	public static void shutdownIOManager() {
-		IO_MANAGER.shutdown();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Tests
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Validates that the buffer behaves correctly if no checkpoint barriers come,
-	 * for a single input channel.
-	 */
-	@Test
-	public void testSingleChannelNoBarriers() {
-		try {
-			BufferOrEvent[] sequence = { 
-					createBuffer(0), createBuffer(0), createBuffer(0),
-					createEndOfPartition(0)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
-			for (BufferOrEvent boe : sequence) {
-				assertEquals(boe, buffer.getNextNonBlocked());
-			}
-			
-			assertNull(buffer.getNextNonBlocked());
-			assertNull(buffer.getNextNonBlocked());
-			
-			buffer.cleanup();
-
-			checkNoTempFilesRemain();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Validates that the buffer behaves correctly if no checkpoint barriers come,
-	 * for an input with multiple input channels.
-	 */
-	@Test
-	public void testMultiChannelNoBarriers() {
-		try {
-			BufferOrEvent[] sequence = { createBuffer(2), createBuffer(2), createBuffer(0),
-					createBuffer(1), createBuffer(0), createEndOfPartition(0),
-					createBuffer(3), createBuffer(1), createEndOfPartition(3),
-					createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
-			for (BufferOrEvent boe : sequence) {
-				assertEquals(boe, buffer.getNextNonBlocked());
-			}
-
-			assertNull(buffer.getNextNonBlocked());
-			assertNull(buffer.getNextNonBlocked());
-
-			buffer.cleanup();
-
-			checkNoTempFilesRemain();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Validates that the buffer preserved the order of elements for a 
-	 * input with a single input channel, and checkpoint events.
-	 */
-	@Test
-	public void testSingleChannelWithBarriers() {
-		try {
-			BufferOrEvent[] sequence = {
-					createBuffer(0), createBuffer(0), createBuffer(0),
-					createBarrier(1, 0),
-					createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0),
-					createBarrier(2, 0), createBarrier(3, 0),
-					createBuffer(0), createBuffer(0),
-					createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0),
-					createBuffer(0), createEndOfPartition(0)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
-			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-			buffer.registerCheckpointEventHandler(handler);
-			handler.setNextExpectedCheckpointId(1L);
-			
-			for (BufferOrEvent boe : sequence) {
-				if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
-					assertEquals(boe, buffer.getNextNonBlocked());
-				}
-			}
-
-			assertNull(buffer.getNextNonBlocked());
-			assertNull(buffer.getNextNonBlocked());
-
-			buffer.cleanup();
-
-			checkNoTempFilesRemain();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Validates that the buffer correctly aligns the streams for inputs with
-	 * multiple input channels, by buffering and blocking certain inputs.
-	 */
-	@Test
-	public void testMultiChannelWithBarriers() {
-		try {
-			BufferOrEvent[] sequence = {
-					// checkpoint with blocked data
-					createBuffer(0), createBuffer(2), createBuffer(0),
-					createBarrier(1, 1), createBarrier(1, 2),
-					createBuffer(2), createBuffer(1), createBuffer(0),
-					createBarrier(1, 0),
-					
-					// checkpoint without blocked data
-					createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2),
-					createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2),
-					
-					// checkpoint with data only from one channel
-					createBuffer(2), createBuffer(2),
-					createBarrier(3, 2),
-					createBuffer(2), createBuffer(2),
-					createBarrier(3, 0), createBarrier(3, 1),
-					
-					// empty checkpoint
-					createBarrier(4, 1), createBarrier(4, 2), createBarrier(4, 0),
-
-					// checkpoint with blocked data in mixed order
-					createBuffer(0), createBuffer(2), createBuffer(0),
-					createBarrier(5, 1),
-					createBuffer(2), createBuffer(0), createBuffer(2), createBuffer(1),
-					createBarrier(5, 2),
-					createBuffer(1), createBuffer(0), createBuffer(2), createBuffer(1),
-					createBarrier(5, 0),
-					
-					// some trailing data
-					createBuffer(0),
-					createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
-			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-			buffer.registerCheckpointEventHandler(handler);
-			handler.setNextExpectedCheckpointId(1L);
-			
-			// pre checkpoint 1
-			check(sequence[0], buffer.getNextNonBlocked());
-			check(sequence[1], buffer.getNextNonBlocked());
-			check(sequence[2], buffer.getNextNonBlocked());
-			assertEquals(1L, handler.getNextExpectedCheckpointId());
-
-			// blocking while aligning for checkpoint 1
-			check(sequence[7], buffer.getNextNonBlocked());
-			assertEquals(1L, handler.getNextExpectedCheckpointId());
-
-			// checkpoint 1 done, returning buffered data
-			check(sequence[5], buffer.getNextNonBlocked());
-			assertEquals(2L, handler.getNextExpectedCheckpointId());
-			check(sequence[6], buffer.getNextNonBlocked());
-
-			// pre checkpoint 2
-			check(sequence[9], buffer.getNextNonBlocked());
-			check(sequence[10], buffer.getNextNonBlocked());
-			check(sequence[11], buffer.getNextNonBlocked());
-			check(sequence[12], buffer.getNextNonBlocked());
-			check(sequence[13], buffer.getNextNonBlocked());
-			assertEquals(2L, handler.getNextExpectedCheckpointId());
-			
-			// checkpoint 2 barriers come together
-			check(sequence[17], buffer.getNextNonBlocked());
-			assertEquals(3L, handler.getNextExpectedCheckpointId());
-			check(sequence[18], buffer.getNextNonBlocked());
-
-			// checkpoint 3 starts, data buffered
-			check(sequence[20], buffer.getNextNonBlocked());
-			assertEquals(4L, handler.getNextExpectedCheckpointId());
-			check(sequence[21], buffer.getNextNonBlocked());
-
-			// checkpoint 4 happens without extra data
-			
-			// pre checkpoint 5
-			check(sequence[27], buffer.getNextNonBlocked());
-			assertEquals(5L, handler.getNextExpectedCheckpointId());
-			check(sequence[28], buffer.getNextNonBlocked());
-			check(sequence[29], buffer.getNextNonBlocked());
-			
-			// checkpoint 5 aligning
-			check(sequence[31], buffer.getNextNonBlocked());
-			check(sequence[32], buffer.getNextNonBlocked());
-			check(sequence[33], buffer.getNextNonBlocked());
-			check(sequence[37], buffer.getNextNonBlocked());
-			
-			// buffered data from checkpoint 5 alignment
-			check(sequence[34], buffer.getNextNonBlocked());
-			check(sequence[36], buffer.getNextNonBlocked());
-			check(sequence[38], buffer.getNextNonBlocked());
-			check(sequence[39], buffer.getNextNonBlocked());
-			
-			// remaining data
-			check(sequence[41], buffer.getNextNonBlocked());
-			check(sequence[42], buffer.getNextNonBlocked());
-			check(sequence[43], buffer.getNextNonBlocked());
-			check(sequence[44], buffer.getNextNonBlocked());
-			
-			assertNull(buffer.getNextNonBlocked());
-			assertNull(buffer.getNextNonBlocked());
-
-			buffer.cleanup();
-
-			checkNoTempFilesRemain();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testMultiChannelTrailingBlockedData() {
-		try {
-			BufferOrEvent[] sequence = {
-					createBuffer(0), createBuffer(1), createBuffer(2),
-					createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
-					
-					createBuffer(2), createBuffer(1), createBuffer(0),
-					createBarrier(2, 1),
-					createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
-					createBarrier(2, 2),
-					createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
-			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-			buffer.registerCheckpointEventHandler(handler);
-			handler.setNextExpectedCheckpointId(1L);
-
-			// pre-checkpoint 1
-			check(sequence[0], buffer.getNextNonBlocked());
-			check(sequence[1], buffer.getNextNonBlocked());
-			check(sequence[2], buffer.getNextNonBlocked());
-			assertEquals(1L, handler.getNextExpectedCheckpointId());
-
-			// pre-checkpoint 2
-			check(sequence[6], buffer.getNextNonBlocked());
-			assertEquals(2L, handler.getNextExpectedCheckpointId());
-			check(sequence[7], buffer.getNextNonBlocked());
-			check(sequence[8], buffer.getNextNonBlocked());
-			
-			// checkpoint 2 alignment
-			check(sequence[13], buffer.getNextNonBlocked());
-			check(sequence[14], buffer.getNextNonBlocked());
-			check(sequence[18], buffer.getNextNonBlocked());
-			check(sequence[19], buffer.getNextNonBlocked());
-
-			// end of stream: remaining buffered contents
-			check(sequence[10], buffer.getNextNonBlocked());
-			check(sequence[11], buffer.getNextNonBlocked());
-			check(sequence[12], buffer.getNextNonBlocked());
-			check(sequence[16], buffer.getNextNonBlocked());
-			check(sequence[17], buffer.getNextNonBlocked());
-
-			assertNull(buffer.getNextNonBlocked());
-			assertNull(buffer.getNextNonBlocked());
-			
-			buffer.cleanup();
-
-			checkNoTempFilesRemain();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Validates that the buffer correctly aligns the streams in cases
-	 * where some channels receive barriers from multiple successive checkpoints
-	 * before the pending checkpoint is complete.
-	 */
-	@Test
-	public void testMultiChannelWithQueuedFutureBarriers() {
-		try {
-			BufferOrEvent[] sequence = {
-					// checkpoint 1 - with blocked data
-					createBuffer(0), createBuffer(2), createBuffer(0),
-					createBarrier(1, 1), createBarrier(1, 2),
-					createBuffer(2), createBuffer(1), createBuffer(0),
-					createBarrier(1, 0),
-					createBuffer(1), createBuffer(0),
-
-					// checkpoint 2 - where future checkpoint barriers come before
-					// the current checkpoint is complete
-					createBarrier(2, 1),
-					createBuffer(1), createBuffer(2), createBarrier(2, 0),
-					createBarrier(3, 0), createBuffer(0),
-					createBarrier(3, 1), createBuffer(0), createBuffer(1), createBuffer(2),
-					createBarrier(4, 1), createBuffer(1), createBuffer(2),
-
-					// complete checkpoint 2, send a barrier for checkpoints 4 and 5
-					createBarrier(2, 2),
-					createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
-					createBarrier(4, 0),
-					createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
-					createBarrier(5, 1),
-
-					// complete checkpoint 3
-					createBarrier(3, 2),
-					createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
-					createBarrier(6, 1),
-					
-					// complete checkpoint 4, checkpoint 5 remains not fully triggered
-					createBarrier(4, 2),
-					createBuffer(2),
-					createBuffer(1), createEndOfPartition(1),
-					createBuffer(2), createEndOfPartition(2),
-					createBuffer(0), createEndOfPartition(0)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
-			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-			buffer.registerCheckpointEventHandler(handler);
-			handler.setNextExpectedCheckpointId(1L);
-
-			// around checkpoint 1
-			check(sequence[0], buffer.getNextNonBlocked());
-			check(sequence[1], buffer.getNextNonBlocked());
-			check(sequence[2], buffer.getNextNonBlocked());
-			check(sequence[7], buffer.getNextNonBlocked());
-			
-			check(sequence[5], buffer.getNextNonBlocked());
-			assertEquals(2L, handler.getNextExpectedCheckpointId());
-			check(sequence[6], buffer.getNextNonBlocked());
-			check(sequence[9], buffer.getNextNonBlocked());
-			check(sequence[10], buffer.getNextNonBlocked());
-
-			// alignment of checkpoint 2 - buffering also some barriers for
-			// checkpoints 3 and 4
-			check(sequence[13], buffer.getNextNonBlocked());
-			check(sequence[20], buffer.getNextNonBlocked());
-			check(sequence[23], buffer.getNextNonBlocked());
-			
-			// checkpoint 2 completed
-			check(sequence[12], buffer.getNextNonBlocked());
-			check(sequence[25], buffer.getNextNonBlocked());
-			check(sequence[27], buffer.getNextNonBlocked());
-			check(sequence[30], buffer.getNextNonBlocked());
-			check(sequence[32], buffer.getNextNonBlocked());
-
-			// checkpoint 3 completed (emit buffered)
-			check(sequence[16], buffer.getNextNonBlocked());
-			check(sequence[18], buffer.getNextNonBlocked());
-			check(sequence[19], buffer.getNextNonBlocked());
-			check(sequence[28], buffer.getNextNonBlocked());
-			
-			// past checkpoint 3
-			check(sequence[36], buffer.getNextNonBlocked());
-			check(sequence[38], buffer.getNextNonBlocked());
-
-			// checkpoint 4 completed (emit buffered)
-			check(sequence[22], buffer.getNextNonBlocked());
-			check(sequence[26], buffer.getNextNonBlocked());
-			check(sequence[31], buffer.getNextNonBlocked());
-			check(sequence[33], buffer.getNextNonBlocked());
-			check(sequence[39], buffer.getNextNonBlocked());
-			
-			// past checkpoint 4, alignment for checkpoint 5
-			check(sequence[42], buffer.getNextNonBlocked());
-			check(sequence[45], buffer.getNextNonBlocked());
-			check(sequence[46], buffer.getNextNonBlocked());
-			
-			// abort checkpoint 5 (end of partition)
-			check(sequence[37], buffer.getNextNonBlocked());
-			
-			// start checkpoint 6 alignment
-			check(sequence[47], buffer.getNextNonBlocked());
-			check(sequence[48], buffer.getNextNonBlocked());
-			
-			// end of input, emit remainder
-			check(sequence[43], buffer.getNextNonBlocked());
-			check(sequence[44], buffer.getNextNonBlocked());
-			
-			assertNull(buffer.getNextNonBlocked());
-			assertNull(buffer.getNextNonBlocked());
-
-			buffer.cleanup();
-
-			checkNoTempFilesRemain();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Validates that the buffer skips over the current checkpoint if it
-	 * receives a barrier from a later checkpoint on a non-blocked input.
-	 */
-	@Test
-	public void testMultiChannelSkippingCheckpoints() {
-		try {
-			BufferOrEvent[] sequence = {
-					// checkpoint 1 - with blocked data
-					createBuffer(0), createBuffer(2), createBuffer(0),
-					createBarrier(1, 1), createBarrier(1, 2),
-					createBuffer(2), createBuffer(1), createBuffer(0),
-					createBarrier(1, 0),
-					createBuffer(1), createBuffer(0),
-
-					// checkpoint 2 will not complete: pre-mature barrier from checkpoint 3
-					createBarrier(2, 1),
-					createBuffer(1), createBuffer(2),
-					createBarrier(2, 0),
-					createBuffer(2), createBuffer(0),
-					createBarrier(3, 2),
-					
-					createBuffer(2),
-					createBuffer(1), createEndOfPartition(1),
-					createBuffer(2), createEndOfPartition(2),
-					createBuffer(0), createEndOfPartition(0)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
-			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-			buffer.registerCheckpointEventHandler(handler);
-			handler.setNextExpectedCheckpointId(1L);
-
-			// checkpoint 1
-			check(sequence[0], buffer.getNextNonBlocked());
-			check(sequence[1], buffer.getNextNonBlocked());
-			check(sequence[2], buffer.getNextNonBlocked());
-			check(sequence[7], buffer.getNextNonBlocked());
-			assertEquals(1L, buffer.getCurrentCheckpointId());
-			
-			check(sequence[5], buffer.getNextNonBlocked());
-			check(sequence[6], buffer.getNextNonBlocked());
-			check(sequence[9], buffer.getNextNonBlocked());
-			check(sequence[10], buffer.getNextNonBlocked());
-
-			// alignment of checkpoint 2
-			check(sequence[13], buffer.getNextNonBlocked());
-			assertEquals(2L, buffer.getCurrentCheckpointId());
-			check(sequence[15], buffer.getNextNonBlocked());
-
-			// checkpoint 2 aborted, checkpoint 3 started
-			check(sequence[12], buffer.getNextNonBlocked());
-			assertEquals(3L, buffer.getCurrentCheckpointId());
-			check(sequence[16], buffer.getNextNonBlocked());
-			check(sequence[19], buffer.getNextNonBlocked());
-			check(sequence[20], buffer.getNextNonBlocked());
-			
-			// checkpoint 3 aborted (end of partition)
-			check(sequence[18], buffer.getNextNonBlocked());
-			check(sequence[21], buffer.getNextNonBlocked());
-			check(sequence[22], buffer.getNextNonBlocked());
-			check(sequence[23], buffer.getNextNonBlocked());
-			check(sequence[24], buffer.getNextNonBlocked());
-
-			assertNull(buffer.getNextNonBlocked());
-			assertNull(buffer.getNextNonBlocked());
-			
-			buffer.cleanup();
-
-			checkNoTempFilesRemain();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Validates that the buffer skips over the current checkpoint if it
-	 * receives a barrier from a later checkpoint on a non-blocked input.
-	 */
-	@Test
-	public void testMultiChannelJumpingOverCheckpoint() {
-		try {
-			BufferOrEvent[] sequence = {
-					// checkpoint 1 - with blocked data
-					createBuffer(0), createBuffer(2), createBuffer(0),
-					createBarrier(1, 1), createBarrier(1, 2),
-					createBuffer(2), createBuffer(1), createBuffer(0),
-					createBarrier(1, 0),
-					createBuffer(1), createBuffer(0),
-
-					// checkpoint 2 will not complete: pre-mature barrier from checkpoint 3
-					createBarrier(2, 1),
-					createBuffer(1), createBuffer(2),
-					createBarrier(2, 0),
-					createBuffer(2), createBuffer(0),
-					createBarrier(3, 1),
-					createBuffer(1), createBuffer(2),
-					createBarrier(3, 0),
-					createBuffer(2), createBuffer(0),
-					createBarrier(4, 2),
-
-					createBuffer(2),
-					createBuffer(1), createEndOfPartition(1),
-					createBuffer(2), createEndOfPartition(2),
-					createBuffer(0), createEndOfPartition(0)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
-			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-			buffer.registerCheckpointEventHandler(handler);
-			handler.setNextExpectedCheckpointId(1L);
-
-			// checkpoint 1
-			check(sequence[0], buffer.getNextNonBlocked());
-			check(sequence[1], buffer.getNextNonBlocked());
-			check(sequence[2], buffer.getNextNonBlocked());
-			check(sequence[7], buffer.getNextNonBlocked());
-			assertEquals(1L, buffer.getCurrentCheckpointId());
-
-			check(sequence[5], buffer.getNextNonBlocked());
-			check(sequence[6], buffer.getNextNonBlocked());
-			check(sequence[9], buffer.getNextNonBlocked());
-			check(sequence[10], buffer.getNextNonBlocked());
-
-			// alignment of checkpoint 2
-			check(sequence[13], buffer.getNextNonBlocked());
-			assertEquals(2L, buffer.getCurrentCheckpointId());
-			check(sequence[15], buffer.getNextNonBlocked());
-			check(sequence[19], buffer.getNextNonBlocked());
-			check(sequence[21], buffer.getNextNonBlocked());
-
-			// checkpoint 2 aborted, checkpoint 4 started. replay buffered
-			check(sequence[12], buffer.getNextNonBlocked());
-			assertEquals(4L, buffer.getCurrentCheckpointId());
-			check(sequence[16], buffer.getNextNonBlocked());
-			check(sequence[18], buffer.getNextNonBlocked());
-			check(sequence[22], buffer.getNextNonBlocked());
-			
-			// align checkpoint 4 remainder
-			check(sequence[25], buffer.getNextNonBlocked());
-			check(sequence[26], buffer.getNextNonBlocked());
-			
-			// checkpoint 4 aborted (due to end of partition)
-			check(sequence[24], buffer.getNextNonBlocked());
-			check(sequence[27], buffer.getNextNonBlocked());
-			check(sequence[28], buffer.getNextNonBlocked());
-			check(sequence[29], buffer.getNextNonBlocked());
-			check(sequence[30], buffer.getNextNonBlocked());
-
-			assertNull(buffer.getNextNonBlocked());
-			assertNull(buffer.getNextNonBlocked());
-
-			buffer.cleanup();
-
-			checkNoTempFilesRemain();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * Validates that the buffer skips over a later checkpoint if it
-	 * receives a barrier from an even later checkpoint on a blocked input.
-	 */
-	@Test
-	public void testMultiChannelSkippingCheckpointsViaBlockedInputs() {
-		try {
-			BufferOrEvent[] sequence = {
-					// checkpoint 1 - with blocked data
-					createBuffer(0), createBuffer(2), createBuffer(0),
-					createBarrier(1, 1), createBarrier(1, 2),
-					createBuffer(2), createBuffer(1), createBuffer(0),
-					createBarrier(1, 0),
-					createBuffer(1), createBuffer(0),
-
-					// checkpoint 2 will not complete: pre-mature barrier from checkpoint 3
-					createBarrier(2, 1),
-					createBuffer(1), createBuffer(2),
-					createBarrier(2, 0),
-					createBuffer(1), createBuffer(0),
-
-					createBarrier(3, 0), // queued barrier on blocked input
-					createBuffer(0),
-					
-					createBarrier(4, 1), // pre-mature barrier on blocked input
-					createBuffer(1),
-					createBuffer(0),
-					createBuffer(2),
-
-					// complete checkpoint 2
-					createBarrier(2, 2),
-					createBuffer(0),
-					
-					createBarrier(3, 2), // should be ignored
-					createBuffer(2),
-					createBarrier(4, 0),
-					createBuffer(0), createBuffer(1), createBuffer(2),
-					createBarrier(4, 2),
-					
-					createBuffer(1), createEndOfPartition(1),
-					createBuffer(2), createEndOfPartition(2),
-					createBuffer(0), createEndOfPartition(0)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
-			// checkpoint 1
-			check(sequence[0], buffer.getNextNonBlocked());
-			check(sequence[1], buffer.getNextNonBlocked());
-			check(sequence[2], buffer.getNextNonBlocked());
-			check(sequence[7], buffer.getNextNonBlocked());
-			assertEquals(1L, buffer.getCurrentCheckpointId());
-			check(sequence[5], buffer.getNextNonBlocked());
-			check(sequence[6], buffer.getNextNonBlocked());
-			check(sequence[9], buffer.getNextNonBlocked());
-			check(sequence[10], buffer.getNextNonBlocked());
-
-			// alignment of checkpoint 2
-			check(sequence[13], buffer.getNextNonBlocked());
-			check(sequence[22], buffer.getNextNonBlocked());
-			assertEquals(2L, buffer.getCurrentCheckpointId());
-
-			// checkpoint 2 completed
-			check(sequence[12], buffer.getNextNonBlocked());
-			check(sequence[15], buffer.getNextNonBlocked());
-			check(sequence[16], buffer.getNextNonBlocked());
-			
-			// checkpoint 3 skipped, alignment for 4 started
-			check(sequence[18], buffer.getNextNonBlocked());
-			assertEquals(4L, buffer.getCurrentCheckpointId());
-			check(sequence[21], buffer.getNextNonBlocked());
-			check(sequence[24], buffer.getNextNonBlocked());
-			check(sequence[26], buffer.getNextNonBlocked());
-			check(sequence[30], buffer.getNextNonBlocked());
-			
-			// checkpoint 4 completed
-			check(sequence[20], buffer.getNextNonBlocked());
-			check(sequence[28], buffer.getNextNonBlocked());
-			check(sequence[29], buffer.getNextNonBlocked());
-			
-			check(sequence[32], buffer.getNextNonBlocked());
-			check(sequence[33], buffer.getNextNonBlocked());
-			check(sequence[34], buffer.getNextNonBlocked());
-			check(sequence[35], buffer.getNextNonBlocked());
-			check(sequence[36], buffer.getNextNonBlocked());
-			check(sequence[37], buffer.getNextNonBlocked());
-			
-			assertNull(buffer.getNextNonBlocked());
-			assertNull(buffer.getNextNonBlocked());
-
-			buffer.cleanup();
-
-			checkNoTempFilesRemain();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testEarlyCleanup() {
-		try {
-			BufferOrEvent[] sequence = {
-					createBuffer(0), createBuffer(1), createBuffer(2),
-					createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
-
-					createBuffer(2), createBuffer(1), createBuffer(0),
-					createBarrier(2, 1),
-					createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
-					createBarrier(2, 2),
-					createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-
-			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-			buffer.registerCheckpointEventHandler(handler);
-			handler.setNextExpectedCheckpointId(1L);
-
-			// pre-checkpoint 1
-			check(sequence[0], buffer.getNextNonBlocked());
-			check(sequence[1], buffer.getNextNonBlocked());
-			check(sequence[2], buffer.getNextNonBlocked());
-			assertEquals(1L, handler.getNextExpectedCheckpointId());
-
-			// pre-checkpoint 2
-			check(sequence[6], buffer.getNextNonBlocked());
-			assertEquals(2L, handler.getNextExpectedCheckpointId());
-			check(sequence[7], buffer.getNextNonBlocked());
-			check(sequence[8], buffer.getNextNonBlocked());
-
-			// checkpoint 2 alignment
-			check(sequence[13], buffer.getNextNonBlocked());
-			check(sequence[14], buffer.getNextNonBlocked());
-			check(sequence[18], buffer.getNextNonBlocked());
-			check(sequence[19], buffer.getNextNonBlocked());
-
-			// end of stream: remaining buffered contents
-			buffer.getNextNonBlocked();
-			buffer.cleanup();
-
-			checkNoTempFilesRemain();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testStartAlignmentWithClosedChannels() {
-		try {
-			BufferOrEvent[] sequence = {
-					// close some channels immediately 
-					createEndOfPartition(2), createEndOfPartition(1),
-
-					// checkpoint without blocked data
-					createBuffer(0), createBuffer(0), createBuffer(3),
-					createBarrier(2, 3), createBarrier(2, 0),
-
-					// checkpoint with blocked data
-					createBuffer(3), createBuffer(0),
-					createBarrier(3, 3),
-					createBuffer(3), createBuffer(0),
-					createBarrier(3, 0),
-
-					// empty checkpoint
-					createBarrier(4, 0), createBarrier(4, 3),
-					
-					// some data, one channel closes
-					createBuffer(0), createBuffer(0), createBuffer(3),
-					createEndOfPartition(0),
-					
-					// checkpoint on last remaining channel
-					createBuffer(3),
-					createBarrier(5, 3),
-					createBuffer(3),
-					createEndOfPartition(3)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence));
-			
-			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
-			
-			// pre checkpoint 2
-			check(sequence[0], buffer.getNextNonBlocked());
-			check(sequence[1], buffer.getNextNonBlocked());
-			check(sequence[2], buffer.getNextNonBlocked());
-			check(sequence[3], buffer.getNextNonBlocked());
-			check(sequence[4], buffer.getNextNonBlocked());
-
-			// checkpoint 3 alignment
-			check(sequence[7], buffer.getNextNonBlocked());
-			assertEquals(2L, buffer.getCurrentCheckpointId());
-			check(sequence[8], buffer.getNextNonBlocked());
-			check(sequence[11], buffer.getNextNonBlocked());
-
-			// checkpoint 3 buffered
-			check(sequence[10], buffer.getNextNonBlocked());
-			assertEquals(3L, buffer.getCurrentCheckpointId());
-
-			// after checkpoint 4
-			check(sequence[15], buffer.getNextNonBlocked());
-			assertEquals(4L, buffer.getCurrentCheckpointId());
-			check(sequence[16], buffer.getNextNonBlocked());
-			check(sequence[17], buffer.getNextNonBlocked());
-			check(sequence[18], buffer.getNextNonBlocked());
-
-			check(sequence[19], buffer.getNextNonBlocked());
-			check(sequence[21], buffer.getNextNonBlocked());
-			assertEquals(5L, buffer.getCurrentCheckpointId());
-			check(sequence[22], buffer.getNextNonBlocked());
-
-			assertNull(buffer.getNextNonBlocked());
-			assertNull(buffer.getNextNonBlocked());
-			
-			buffer.cleanup();
-			
-			checkNoTempFilesRemain();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testEndOfStreamWhileCheckpoint() {
-		
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utils
-	// ------------------------------------------------------------------------
-
-	private static BufferOrEvent createBarrier(long id, int channel) {
-		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
-	}
-
-	private static BufferOrEvent createBuffer(int channel) {
-		// since we have no access to the contents, we need to use the size as an
-		// identifier to validate correctness here
-		Buffer buf = new Buffer(
-				MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE),
-				FreeingBufferRecycler.INSTANCE);
-		
-		buf.setSize(SIZE_COUNTER++);
-		return new BufferOrEvent(buf, channel);
-	}
-
-	private static BufferOrEvent createEndOfPartition(int channel) {
-		return new BufferOrEvent(EndOfPartitionEvent.INSTANCE, channel);
-	}
-	
-	private static void check(BufferOrEvent expected, BufferOrEvent present) {
-		assertNotNull(expected);
-		assertNotNull(present);
-		assertEquals(expected.isBuffer(), present.isBuffer());
-		
-		if (expected.isBuffer()) {
-			// since we have no access to the contents, we need to use the size as an
-			// identifier to validate correctness here
-			assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
-		}
-		else {
-			assertEquals(expected.getEvent(), present.getEvent());
-		}
-	}
-	
-	private static void checkNoTempFilesRemain() {
-		// validate that all temp files have been removed
-		for (File dir : IO_MANAGER.getSpillingDirectories()) {
-			for (String file : dir.list()) {
-				if (file != null && !(file.equals(".") || file.equals(".."))) {
-					fail("barrier buffer did not clean up temp files. remaining file: " + file);
-				}
-			}
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Testing Mocks
-	// ------------------------------------------------------------------------
-
-	private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> {
-		
-		private long nextExpectedCheckpointId = -1L;
-
-		public void setNextExpectedCheckpointId(long nextExpectedCheckpointId) {
-			this.nextExpectedCheckpointId = nextExpectedCheckpointId;
-		}
-
-		public long getNextExpectedCheckpointId() {
-			return nextExpectedCheckpointId;
-		}
-
-		@Override
-		public void onEvent(CheckpointBarrier barrier) {
-			assertNotNull(barrier);
-			assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == barrier.getId());
-			assertTrue(barrier.getTimestamp() > 0);
-			nextExpectedCheckpointId++;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
deleted file mode 100644
index b9b6e5f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests for the behavior of the barrier tracker.
- */
-public class BarrierTrackerTest {
-	
-	private static final int PAGE_SIZE = 512;
-	
-	@Test
-	public void testSingleChannelNoBarriers() {
-		try {
-			BufferOrEvent[] sequence = { createBuffer(0), createBuffer(0), createBuffer(0) };
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
-			BarrierTracker tracker = new BarrierTracker(gate);
-
-			for (BufferOrEvent boe : sequence) {
-				assertEquals(boe, tracker.getNextNonBlocked());
-			}
-			
-			assertNull(tracker.getNextNonBlocked());
-			assertNull(tracker.getNextNonBlocked());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testMultiChannelNoBarriers() {
-		try {
-			BufferOrEvent[] sequence = { createBuffer(2), createBuffer(2), createBuffer(0),
-					createBuffer(1), createBuffer(0), createBuffer(3),
-					createBuffer(1), createBuffer(1), createBuffer(2)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence));
-			BarrierTracker tracker = new BarrierTracker(gate);
-
-			for (BufferOrEvent boe : sequence) {
-				assertEquals(boe, tracker.getNextNonBlocked());
-			}
-
-			assertNull(tracker.getNextNonBlocked());
-			assertNull(tracker.getNextNonBlocked());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSingleChannelWithBarriers() {
-		try {
-			BufferOrEvent[] sequence = {
-					createBuffer(0), createBuffer(0), createBuffer(0),
-					createBarrier(1, 0),
-					createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0),
-					createBarrier(2, 0), createBarrier(3, 0),
-					createBuffer(0), createBuffer(0),
-					createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0),
-					createBuffer(0)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
-			BarrierTracker tracker = new BarrierTracker(gate);
-
-			CheckpointSequenceValidator validator =
-					new CheckpointSequenceValidator(1, 2, 3, 4, 5, 6);
-			tracker.registerCheckpointEventHandler(validator);
-			
-			for (BufferOrEvent boe : sequence) {
-				if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
-					assertEquals(boe, tracker.getNextNonBlocked());
-				}
-			}
-
-			assertNull(tracker.getNextNonBlocked());
-			assertNull(tracker.getNextNonBlocked());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSingleChannelWithSkippedBarriers() {
-		try {
-			BufferOrEvent[] sequence = {
-					createBuffer(0),
-					createBarrier(1, 0),
-					createBuffer(0), createBuffer(0),
-					createBarrier(3, 0), createBuffer(0),
-					createBarrier(4, 0), createBarrier(6, 0), createBuffer(0),
-					createBarrier(7, 0), createBuffer(0), createBarrier(10, 0),
-					createBuffer(0)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
-			BarrierTracker tracker = new BarrierTracker(gate);
-
-			CheckpointSequenceValidator validator =
-					new CheckpointSequenceValidator(1, 3, 4, 6, 7, 10);
-			tracker.registerCheckpointEventHandler(validator);
-
-			for (BufferOrEvent boe : sequence) {
-				if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
-					assertEquals(boe, tracker.getNextNonBlocked());
-				}
-			}
-
-			assertNull(tracker.getNextNonBlocked());
-			assertNull(tracker.getNextNonBlocked());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testMultiChannelWithBarriers() {
-		try {
-			BufferOrEvent[] sequence = {
-					createBuffer(0), createBuffer(2), createBuffer(0),
-					createBarrier(1, 1), createBarrier(1, 2),
-					createBuffer(2), createBuffer(1),
-					createBarrier(1, 0),
-					
-					createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2),
-					createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2),
-					
-					createBuffer(2), createBuffer(2),
-					createBarrier(3, 2),
-					createBuffer(2), createBuffer(2),
-					createBarrier(3, 0), createBarrier(3, 1),
-					
-					createBarrier(4, 1), createBarrier(4, 2), createBarrier(4, 0),
-					
-					createBuffer(0)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierTracker tracker = new BarrierTracker(gate);
-
-			CheckpointSequenceValidator validator =
-					new CheckpointSequenceValidator(1, 2, 3, 4);
-			tracker.registerCheckpointEventHandler(validator);
-
-			for (BufferOrEvent boe : sequence) {
-				if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
-					assertEquals(boe, tracker.getNextNonBlocked());
-				}
-			}
-
-			assertNull(tracker.getNextNonBlocked());
-			assertNull(tracker.getNextNonBlocked());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testMultiChannelSkippingCheckpoints() {
-		try {
-			BufferOrEvent[] sequence = {
-					createBuffer(0), createBuffer(2), createBuffer(0),
-					createBarrier(1, 1), createBarrier(1, 2),
-					createBuffer(2), createBuffer(1),
-					createBarrier(1, 0),
-
-					createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2),
-					createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2),
-
-					createBuffer(2), createBuffer(2),
-					createBarrier(3, 2),
-					createBuffer(2), createBuffer(2),
-					
-					// jump to checkpoint 4
-					createBarrier(4, 0),
-					createBuffer(0), createBuffer(1), createBuffer(2),
-					createBarrier(4, 1),
-					createBuffer(1),
-					createBarrier(4, 2),
-					
-					createBuffer(0)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierTracker tracker = new BarrierTracker(gate);
-
-			CheckpointSequenceValidator validator =
-					new CheckpointSequenceValidator(1, 2, 4);
-			tracker.registerCheckpointEventHandler(validator);
-
-			for (BufferOrEvent boe : sequence) {
-				if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
-					assertEquals(boe, tracker.getNextNonBlocked());
-				}
-			}
-			
-			assertNull(tracker.getNextNonBlocked());
-			assertNull(tracker.getNextNonBlocked());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	/**
-	 * This test validates that the barrier tracker does not immediately
-	 * discard a pending checkpoint as soon as it sees a barrier from a
-	 * later checkpoint from some channel.
-	 * 
-	 * This behavior is crucial, otherwise topologies where different inputs
-	 * have different latency (and that latency is close to or higher than the
-	 * checkpoint interval) may skip many checkpoints, or fail to complete a
-	 * checkpoint all together.
-	 */
-	@Test
-	public void testCompleteCheckpointsOnLateBarriers() {
-		try {
-			BufferOrEvent[] sequence = {
-					// checkpoint 2
-					createBuffer(1), createBuffer(1), createBuffer(0), createBuffer(2),
-					createBarrier(2, 1), createBarrier(2, 0), createBarrier(2, 2),
-					
-					// incomplete checkpoint 3
-					createBuffer(1), createBuffer(0),
-					createBarrier(3, 1), createBarrier(3, 2),
-					
-					// some barriers from checkpoint 4
-					createBuffer(1), createBuffer(0),
-					createBarrier(4, 2), createBarrier(4, 1),
-					createBuffer(1), createBuffer(2),
-	
-					// last barrier from checkpoint 3
-					createBarrier(3, 0),
-					
-					// complete checkpoint 4
-					createBuffer(0), createBarrier(4, 0),
-					
-					// regular checkpoint 5
-					createBuffer(1), createBuffer(2), createBarrier(5, 1), 
-					createBuffer(0), createBarrier(5, 0),
-					createBuffer(1), createBarrier(5, 2),
-					
-					// checkpoint 6 (incomplete),
-					createBuffer(1), createBarrier(6, 1),
-					createBuffer(0), createBarrier(6, 0),
-					
-					// checkpoint 7, with early barriers for checkpoints 8 and 9
-					createBuffer(1), createBarrier(7, 1),
-					createBuffer(0), createBarrier(7, 2),
-					createBuffer(2), createBarrier(8, 2), 
-					createBuffer(0), createBarrier(8, 1),
-					createBuffer(1), createBarrier(9, 1),
-					
-					// complete checkpoint 7, first barriers from checkpoint 10
-					createBarrier(7, 0),
-					createBuffer(0), createBarrier(9, 2),
-					createBuffer(2), createBarrier(10, 2),
-					
-					// complete checkpoint 8 and 9
-					createBarrier(8, 0),
-					createBuffer(1), createBuffer(2), createBarrier(9, 0),
-					
-					// trailing data
-					createBuffer(1), createBuffer(0), createBuffer(2)
-			};
-
-			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
-			BarrierTracker tracker = new BarrierTracker(gate);
-
-			CheckpointSequenceValidator validator =
-					new CheckpointSequenceValidator(2, 3, 4, 5, 7, 8, 9);
-			tracker.registerCheckpointEventHandler(validator);
-
-			for (BufferOrEvent boe : sequence) {
-				if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) {
-					assertEquals(boe, tracker.getNextNonBlocked());
-				}
-			}
-
-			assertNull(tracker.getNextNonBlocked());
-			assertNull(tracker.getNextNonBlocked());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utils
-	// ------------------------------------------------------------------------
-
-	private static BufferOrEvent createBarrier(long id, int channel) {
-		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
-	}
-
-	private static BufferOrEvent createBuffer(int channel) {
-		return new BufferOrEvent(
-				new Buffer(MemorySegmentFactory.wrap(new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE), channel);
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Testing Mocks
-	// ------------------------------------------------------------------------
-	
-	private static class CheckpointSequenceValidator implements EventListener<CheckpointBarrier> {
-
-		private final long[] checkpointIDs;
-		
-		private int i = 0;
-
-		private CheckpointSequenceValidator(long... checkpointIDs) {
-			this.checkpointIDs = checkpointIDs;
-		}
-		
-		@Override
-		public void onEvent(CheckpointBarrier barrier) {
-			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
-			assertNotNull(barrier);
-			assertEquals("wrong checkpoint id", checkpointIDs[i++], barrier.getId());
-			assertTrue(barrier.getTimestamp() > 0);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
deleted file mode 100644
index e85eddb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Random;
-
-
-import static org.junit.Assert.*;
-
-public class BufferSpillerTest {
-	
-	private static final Logger LOG = LoggerFactory.getLogger(BufferSpillerTest.class);
-
-	private static final int PAGE_SIZE = 4096;
-
-	private static IOManager IO_MANAGER;
-
-	private BufferSpiller spiller;
-
-
-	// ------------------------------------------------------------------------
-	//  Setup / Cleanup
-	// ------------------------------------------------------------------------
-	
-	@BeforeClass
-	public static void setupIOManager() {
-		IO_MANAGER = new IOManagerAsync();
-	}
-
-	@AfterClass
-	public static void shutdownIOManager() {
-		IO_MANAGER.shutdown();
-	}
-	
-	@Before
-	public void createSpiller() {
-		try {
-			spiller = new BufferSpiller(IO_MANAGER, PAGE_SIZE);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Cannot create BufferSpiller: " + e.getMessage());
-		}
-	}
-	
-	@After
-	public void cleanupSpiller() {
-		if (spiller != null) {
-			try {
-				spiller.close();
-			}
-			catch (Exception e) {
-				e.printStackTrace();
-				fail("Cannot properly close the BufferSpiller: " + e.getMessage());
-			}
-			
-			assertFalse(spiller.getCurrentChannel().isOpen());
-			assertFalse(spiller.getCurrentSpillFile().exists());
-		}
-		
-		checkNoTempFilesRemain();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Tests
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void testRollOverEmptySequences() {
-		try {
-			assertNull(spiller.rollOver());
-			assertNull(spiller.rollOver());
-			assertNull(spiller.rollOver());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSpillAndRollOverSimple() {
-		try {
-			final Random rnd = new Random();
-			final Random bufferRnd = new Random();
-
-			final int maxNumEventsAndBuffers = 3000;
-			final int maxNumChannels = 1656;
-
-			// do multiple spilling / rolling over rounds
-			for (int round = 0; round < 5; round++) {
-				
-				final long bufferSeed = rnd.nextLong();
-				bufferRnd.setSeed(bufferSeed);
-				
-				final int numEventsAndBuffers = rnd.nextInt(maxNumEventsAndBuffers) + 1;
-				final int numChannels = rnd.nextInt(maxNumChannels) + 1;
-				
-				final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
-
-				// generate sequence
-				for (int i = 0; i < numEventsAndBuffers; i++) {
-					boolean isEvent = rnd.nextDouble() < 0.05d;
-					if (isEvent) {
-						BufferOrEvent evt = generateRandomEvent(rnd, numChannels);
-						events.add(evt);
-						spiller.add(evt);
-					}
-					else {
-						BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
-						spiller.add(evt);
-					}
-				}
-
-				// reset and create reader
-				bufferRnd.setSeed(bufferSeed);
-			
-				BufferSpiller.SpilledBufferOrEventSequence seq = spiller.rollOver();
-				seq.open();
-
-				// read and validate the sequence
-
-				int numEvent = 0;
-				for (int i = 0; i < numEventsAndBuffers; i++) {
-					BufferOrEvent next = seq.getNext();
-					assertNotNull(next);
-					if (next.isEvent()) {
-						BufferOrEvent expected = events.get(numEvent++);
-						assertEquals(expected.getEvent(), next.getEvent());
-						assertEquals(expected.getChannelIndex(), next.getChannelIndex());
-					}
-					else {
-						validateBuffer(next, bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
-					}
-				}
-
-				// no further data
-				assertNull(seq.getNext());
-
-				// all events need to be consumed
-				assertEquals(events.size(), numEvent);
-				
-				seq.cleanup();
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testSpillWhileReading() {
-		LOG.info("Starting SpillWhileReading test");
-		
-		try {
-			final int sequences = 10;
-			
-			final Random rnd = new Random();
-			
-			final int maxNumEventsAndBuffers = 30000;
-			final int maxNumChannels = 1656;
-			
-			int sequencesConsumed = 0;
-			
-			ArrayDeque<SequenceToConsume> pendingSequences = new ArrayDeque<SequenceToConsume>();
-			SequenceToConsume currentSequence = null;
-			int currentNumEvents = 0;
-			int currentNumRecordAndEvents = 0;
-			
-			// do multiple spilling / rolling over rounds
-			for (int round = 0; round < 2*sequences; round++) {
-
-				if (round % 2 == 1) {
-					// make this an empty sequence
-					assertNull(spiller.rollOver());
-				}
-				else {
-					// proper spilled sequence
-					final long bufferSeed = rnd.nextLong();
-					final Random bufferRnd = new Random(bufferSeed);
-					
-					final int numEventsAndBuffers = rnd.nextInt(maxNumEventsAndBuffers) + 1;
-					final int numChannels = rnd.nextInt(maxNumChannels) + 1;
-	
-					final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
-	
-					int generated = 0;
-					while (generated < numEventsAndBuffers) {
-						
-						if (currentSequence == null || rnd.nextDouble() < 0.5) {
-							// add a new record
-							boolean isEvent = rnd.nextDouble() < 0.05;
-							if (isEvent) {
-								BufferOrEvent evt = generateRandomEvent(rnd, numChannels);
-								events.add(evt);
-								spiller.add(evt);
-							}
-							else {
-								BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels));
-								spiller.add(evt);
-							}
-							generated++;
-						}
-						else {
-							// consume a record
-							BufferOrEvent next = currentSequence.sequence.getNext();
-							assertNotNull(next);
-							if (next.isEvent()) {
-								BufferOrEvent expected = currentSequence.events.get(currentNumEvents++);
-								assertEquals(expected.getEvent(), next.getEvent());
-								assertEquals(expected.getChannelIndex(), next.getChannelIndex());
-							}
-							else {
-								Random validationRnd = currentSequence.bufferRnd;
-								validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numChannels));
-							}
-							
-							currentNumRecordAndEvents++;
-							if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) {
-								// done with the sequence
-								currentSequence.sequence.cleanup();
-								sequencesConsumed++;
-								
-								// validate we had all events
-								assertEquals(currentSequence.events.size(), currentNumEvents);
-								
-								// reset
-								currentSequence = pendingSequences.pollFirst();
-								if (currentSequence != null) {
-									currentSequence.sequence.open();
-								}
-								
-								currentNumRecordAndEvents = 0;
-								currentNumEvents = 0;
-							}
-						}
-					}
-	
-					// done generating a sequence. queue it for consumption
-					bufferRnd.setSeed(bufferSeed);
-					BufferSpiller.SpilledBufferOrEventSequence seq = spiller.rollOver();
-					
-					SequenceToConsume stc = new SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, numChannels);
-					
-					if (currentSequence == null) {
-						currentSequence = stc;
-						stc.sequence.open();
-					}
-					else {
-						pendingSequences.addLast(stc);
-					}
-				}
-			}
-			
-			// consume all the remainder
-			while (currentSequence != null) {
-				// consume a record
-				BufferOrEvent next = currentSequence.sequence.getNext();
-				assertNotNull(next);
-				if (next.isEvent()) {
-					BufferOrEvent expected = currentSequence.events.get(currentNumEvents++);
-					assertEquals(expected.getEvent(), next.getEvent());
-					assertEquals(expected.getChannelIndex(), next.getChannelIndex());
-				}
-				else {
-					Random validationRnd = currentSequence.bufferRnd;
-					validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numChannels));
-				}
-
-				currentNumRecordAndEvents++;
-				if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) {
-					// done with the sequence
-					currentSequence.sequence.cleanup();
-					sequencesConsumed++;
-
-					// validate we had all events
-					assertEquals(currentSequence.events.size(), currentNumEvents);
-
-					// reset
-					currentSequence = pendingSequences.pollFirst();
-					if (currentSequence != null) {
-						currentSequence.sequence.open();
-					}
-
-					currentNumRecordAndEvents = 0;
-					currentNumEvents = 0;
-				}
-			}
-			
-			assertEquals(sequences, sequencesConsumed);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Utils
-	// ------------------------------------------------------------------------
-	
-	private static BufferOrEvent generateRandomEvent(Random rnd, int numChannels) {
-		long magicNumber = rnd.nextLong();
-		byte[] data = new byte[rnd.nextInt(1000)];
-		rnd.nextBytes(data);
-		TestEvent evt = new TestEvent(magicNumber, data);
-
-		int channelIndex = rnd.nextInt(numChannels);
-		
-		return new BufferOrEvent(evt, channelIndex);
-	}
-
-	private static BufferOrEvent generateRandomBuffer(int size, int channelIndex) {
-		MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
-		for (int i = 0; i < size; i++) {
-			seg.put(i, (byte) i);
-		}
-		
-		Buffer buf = new Buffer(seg, FreeingBufferRecycler.INSTANCE);
-		buf.setSize(size);
-		return new BufferOrEvent(buf, channelIndex);
-	}
-
-	private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) {
-		assertEquals("wrong channel index", expectedChannelIndex, boe.getChannelIndex());
-		assertTrue("is not buffer", boe.isBuffer());
-
-		Buffer buf = boe.getBuffer();
-		assertEquals("wrong buffer size", expectedSize, buf.getSize());
-
-		MemorySegment seg = buf.getMemorySegment();
-		for (int i = 0; i < expectedSize; i++) {
-			byte expected = (byte) i;
-			if (expected != seg.get(i)) {
-				fail(String.format(
-						"wrong buffer contents at position %s : expected=%d , found=%d", i, expected, seg.get(i)));
-			}
-		}
-	}
-
-	private static void checkNoTempFilesRemain() {
-		// validate that all temp files have been removed
-		for (File dir : IO_MANAGER.getSpillingDirectories()) {
-			for (String file : dir.list()) {
-				if (file != null && !(file.equals(".") || file.equals(".."))) {
-					fail("barrier buffer did not clean up temp files. remaining file: " + file);
-				}
-			}
-		}
-	}
-	
-	private static class SequenceToConsume {
-
-		final BufferSpiller.SpilledBufferOrEventSequence sequence;
-		final ArrayList<BufferOrEvent> events;
-		final Random bufferRnd;
-		final int numBuffersAndEvents;
-		final int numChannels;
-
-		private SequenceToConsume(Random bufferRnd, ArrayList<BufferOrEvent> events,
-									BufferSpiller.SpilledBufferOrEventSequence sequence,
-									int numBuffersAndEvents, int numChannels) {
-			this.bufferRnd = bufferRnd;
-			this.events = events;
-			this.sequence = sequence;
-			this.numBuffersAndEvents = numBuffersAndEvents;
-			this.numChannels = numChannels;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
deleted file mode 100644
index cb8a058..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/MockInputGate.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.runtime.event.TaskEvent;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-
-import java.util.ArrayDeque;
-import java.util.List;
-import java.util.Queue;
-
-public class MockInputGate implements InputGate {
-
-	private final int pageSize;
-	
-	private final int numChannels;
-	
-	private final Queue<BufferOrEvent> boes;
-
-	private final boolean[] closed;
-	
-	private int closedChannels;
-
-	
-	public MockInputGate(int pageSize, int numChannels, List<BufferOrEvent> boes) {
-		this.pageSize = pageSize;
-		this.numChannels = numChannels;
-		this.boes = new ArrayDeque<BufferOrEvent>(boes);
-		this.closed = new boolean[numChannels];
-	}
-
-	@Override
-	public int getPageSize() {
-		return pageSize;
-	}
-	
-	@Override
-	public int getNumberOfInputChannels() {
-		return numChannels;
-	}
-
-	@Override
-	public boolean isFinished() {
-		return boes.isEmpty();
-	}
-
-	@Override
-	public BufferOrEvent getNextBufferOrEvent() {
-		BufferOrEvent next = boes.poll();
-		if (next == null) {
-			return null;
-		}
-		
-		int channelIdx = next.getChannelIndex();
-		if (closed[channelIdx]) {
-			throw new RuntimeException("Inconsistent: Channel " + channelIdx
-					+ " has data even though it is already closed.");
-		}
-		if (next.isEvent() && next.getEvent() instanceof EndOfPartitionEvent) {
-			closed[channelIdx] = true;
-			closedChannels++;
-		}
-		return next;
-	}
-
-	@Override
-	public void requestPartitions() {}
-
-	@Override
-	public void sendTaskEvent(TaskEvent event) {}
-
-	@Override
-	public void registerListener(EventListener<InputGate> listener) {}
-	
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
deleted file mode 100644
index 991b033..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpilledBufferOrEventSequenceTest.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.streaming.runtime.io.BufferSpiller.SpilledBufferOrEventSequence;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.nio.channels.FileChannel;
-import java.util.ArrayList;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-/**
- * Tests that validate the behavior of the {@link SpilledBufferOrEventSequence} in isolation,
- * with respect to detecting corrupt sequences, trailing data, and interleaved buffers and events.
- */
-public class SpilledBufferOrEventSequenceTest {
-	
-	private final ByteBuffer buffer = ByteBuffer.allocateDirect(128 * 1024).order(ByteOrder.LITTLE_ENDIAN);
-	private final int pageSize = 32*1024;
-	
-	private File tempFile;
-	private FileChannel fileChannel;
-	
-	
-	@Before
-	public void initTempChannel() {
-		try {
-			tempFile = File.createTempFile("testdata", "tmp");
-			fileChannel = new RandomAccessFile(tempFile, "rw").getChannel();
-		}
-		catch (Exception e) {
-			cleanup();
-		}
-	}
-	
-	@After
-	public void cleanup() {
-		if (fileChannel != null) {
-			try {
-				fileChannel.close();
-			}
-			catch (IOException e) {
-				// ignore
-			}
-		}
-		if (tempFile != null) {
-			//noinspection ResultOfMethodCallIgnored
-			tempFile.delete();
-		}
-	}
-	
-	
-	// ------------------------------------------------------------------------
-	//  Tests
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void testEmptyChannel() {
-		try {
-			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
-			seq.open();
-			
-			assertNull(seq.getNext());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testIncompleteHeaderOnFirstElement() {
-		try {
-			ByteBuffer buf = ByteBuffer.allocate(7);
-			buf.order(ByteOrder.LITTLE_ENDIAN);
-			
-			fileChannel.write(buf);
-			fileChannel.position(0);
-			
-			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
-			seq.open();
-			
-			try {
-				seq.getNext();
-				fail("should fail with an exception");
-			}
-			catch (IOException e) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testBufferSequence() {
-		try {
-			final Random rnd = new Random();
-			final long seed = rnd.nextLong();
-			
-			final int numBuffers = 325;
-			final int numChannels = 671;
-			
-			rnd.setSeed(seed);
-			
-			for (int i = 0; i < numBuffers; i++) {
-				writeBuffer(fileChannel, rnd.nextInt(pageSize) + 1, rnd.nextInt(numChannels));
-			}
-
-			fileChannel.position(0L);
-			rnd.setSeed(seed);
-
-			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
-			seq.open();
-			
-			for (int i = 0; i < numBuffers; i++) {
-				validateBuffer(seq.getNext(), rnd.nextInt(pageSize) + 1, rnd.nextInt(numChannels));
-			}
-			
-			// should have no more data
-			assertNull(seq.getNext());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testBufferSequenceWithIncompleteBuffer() {
-		try {
-			writeBuffer(fileChannel, 1672, 7);
-			
-			// write an incomplete buffer
-			ByteBuffer data = ByteBuffer.allocate(615);
-			data.order(ByteOrder.LITTLE_ENDIAN);
-			
-			data.putInt(2);
-			data.putInt(999);
-			data.put((byte) 0);
-			data.position(0);
-			data.limit(312);
-			fileChannel.write(data);
-			fileChannel.position(0L);
-
-			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
-			seq.open();
-			
-			// first one is valid
-			validateBuffer(seq.getNext(), 1672, 7);
-			
-			// next one should fail
-			try {
-				seq.getNext();
-				fail("should fail with an exception");
-			}
-			catch (IOException e) {
-				// expected
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testEventSequence() {
-		try {
-			final Random rnd = new Random();
-			final int numEvents = 3000;
-			final int numChannels = 1656;
-			
-			final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(numEvents);
-			
-			for (int i = 0; i < numEvents; i++) {
-				events.add(generateAndWriteEvent(fileChannel, rnd, numChannels));
-			}
-
-			fileChannel.position(0L);
-			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
-			seq.open();
-			
-			int i = 0;
-			BufferOrEvent boe;
-			while ((boe = seq.getNext()) != null) {
-				BufferOrEvent expected = events.get(i);
-				assertTrue(boe.isEvent());
-				assertEquals(expected.getEvent(), boe.getEvent());
-				assertEquals(expected.getChannelIndex(), boe.getChannelIndex());
-				i++;
-			}
-			
-			assertEquals(numEvents, i);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testMixedSequence() {
-		try {
-			final Random rnd = new Random();
-			final Random bufferRnd = new Random();
-
-			final long bufferSeed = rnd.nextLong();
-			bufferRnd.setSeed(bufferSeed);
-			
-			final int numEventsAndBuffers = 3000;
-			final int numChannels = 1656;
-
-			final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128);
-
-			// generate sequence
-			
-			for (int i = 0; i < numEventsAndBuffers; i++) {
-				boolean isEvent = rnd.nextDouble() < 0.05d;
-				if (isEvent) {
-					events.add(generateAndWriteEvent(fileChannel, rnd, numChannels));
-				}
-				else {
-					writeBuffer(fileChannel, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
-				}
-			}
-			
-			// reset and create reader
-			
-			fileChannel.position(0L);
-			bufferRnd.setSeed(bufferSeed);
-			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
-			seq.open();
-			
-			// read and validate the sequence
-			
-			int numEvent = 0;
-			for (int i = 0; i < numEventsAndBuffers; i++) {
-				BufferOrEvent next = seq.getNext();
-				if (next.isEvent()) {
-					BufferOrEvent expected = events.get(numEvent++);
-					assertEquals(expected.getEvent(), next.getEvent());
-					assertEquals(expected.getChannelIndex(), next.getChannelIndex());
-				}
-				else {
-					validateBuffer(next, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
-				}
-			}
-			
-			// no further data
-			assertNull(seq.getNext());
-			
-			// all events need to be consumed
-			assertEquals(events.size(), numEvent);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testMultipleSequences() {
-		File secondFile = null;
-		FileChannel secondChannel = null;
-		
-		try {
-			// create the second file channel
-			secondFile = File.createTempFile("testdata", "tmp");
-			secondChannel = new RandomAccessFile(secondFile, "rw").getChannel();
-			
-			final Random rnd = new Random();
-			final Random bufferRnd = new Random();
-
-			final long bufferSeed = rnd.nextLong();
-			bufferRnd.setSeed(bufferSeed);
-
-			final int numEventsAndBuffers1 = 272;
-			final int numEventsAndBuffers2 = 151;
-			
-			final int numChannels = 1656;
-
-			final ArrayList<BufferOrEvent> events1 = new ArrayList<BufferOrEvent>(128);
-			final ArrayList<BufferOrEvent> events2 = new ArrayList<BufferOrEvent>(128);
-
-			// generate sequence 1
-
-			for (int i = 0; i < numEventsAndBuffers1; i++) {
-				boolean isEvent = rnd.nextDouble() < 0.05d;
-				if (isEvent) {
-					events1.add(generateAndWriteEvent(fileChannel, rnd, numChannels));
-				}
-				else {
-					writeBuffer(fileChannel, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
-				}
-			}
-
-			// generate sequence 2
-
-			for (int i = 0; i < numEventsAndBuffers2; i++) {
-				boolean isEvent = rnd.nextDouble() < 0.05d;
-				if (isEvent) {
-					events2.add(generateAndWriteEvent(secondChannel, rnd, numChannels));
-				}
-				else {
-					writeBuffer(secondChannel, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
-				}
-			}
-
-			// reset and create reader
-
-			fileChannel.position(0L);
-			secondChannel.position(0L);
-			
-			bufferRnd.setSeed(bufferSeed);
-			
-			SpilledBufferOrEventSequence seq1 = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
-			SpilledBufferOrEventSequence seq2 = new SpilledBufferOrEventSequence(secondFile, secondChannel, buffer, pageSize);
-
-			// read and validate the sequence 1
-			seq1.open();
-
-			int numEvent = 0;
-			for (int i = 0; i < numEventsAndBuffers1; i++) {
-				BufferOrEvent next = seq1.getNext();
-				if (next.isEvent()) {
-					BufferOrEvent expected = events1.get(numEvent++);
-					assertEquals(expected.getEvent(), next.getEvent());
-					assertEquals(expected.getChannelIndex(), next.getChannelIndex());
-				}
-				else {
-					validateBuffer(next, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
-				}
-			}
-			assertNull(seq1.getNext());
-			assertEquals(events1.size(), numEvent);
-
-			// read and validate the sequence 2
-			seq2.open();
-
-			numEvent = 0;
-			for (int i = 0; i < numEventsAndBuffers2; i++) {
-				BufferOrEvent next = seq2.getNext();
-				if (next.isEvent()) {
-					BufferOrEvent expected = events2.get(numEvent++);
-					assertEquals(expected.getEvent(), next.getEvent());
-					assertEquals(expected.getChannelIndex(), next.getChannelIndex());
-				}
-				else {
-					validateBuffer(next, bufferRnd.nextInt(pageSize) + 1, bufferRnd.nextInt(numChannels));
-				}
-			}
-			assertNull(seq2.getNext());
-			assertEquals(events2.size(), numEvent);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			if (secondChannel != null) {
-				try {
-					secondChannel.close();
-				}
-				catch (IOException e) {
-					// ignore here
-				}
-			}
-			if (secondFile != null) {
-				//noinspection ResultOfMethodCallIgnored
-				secondFile.delete();
-			}
-		}
-	}
-
-	@Test
-	public void testCleanup() {
-		try {
-			ByteBuffer data = ByteBuffer.allocate(157);
-			data.order(ByteOrder.LITTLE_ENDIAN);
-			
-			fileChannel.write(data);
-			fileChannel.position(54);
-			
-			SpilledBufferOrEventSequence seq = new SpilledBufferOrEventSequence(tempFile, fileChannel, buffer, pageSize);
-			seq.open();
-			seq.cleanup();
-			
-			assertFalse(fileChannel.isOpen());
-			assertFalse(tempFile.exists());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Utils
-	// ------------------------------------------------------------------------
-
-	private static BufferOrEvent generateAndWriteEvent(FileChannel fileChannel, Random rnd, int numChannels) throws IOException {
-		long magicNumber = rnd.nextLong();
-		byte[] data = new byte[rnd.nextInt(1000)];
-		rnd.nextBytes(data);
-		TestEvent evt = new TestEvent(magicNumber, data);
-		
-		int channelIndex = rnd.nextInt(numChannels);
-		
-		ByteBuffer serializedEvent = EventSerializer.toSerializedEvent(evt);
-		ByteBuffer header = ByteBuffer.allocate(9);
-		header.order(ByteOrder.LITTLE_ENDIAN);
-		
-		header.putInt(channelIndex);
-		header.putInt(serializedEvent.remaining());
-		header.put((byte) 1);
-		header.flip();
-		
-		fileChannel.write(header);
-		fileChannel.write(serializedEvent);
-		return new BufferOrEvent(evt, channelIndex);
-	}
-	
-	private static void writeBuffer(FileChannel fileChannel, int size, int channelIndex) throws IOException {
-		ByteBuffer data = ByteBuffer.allocate(size + 9);
-		data.order(ByteOrder.LITTLE_ENDIAN);
-		
-		data.putInt(channelIndex);
-		data.putInt(size);
-		data.put((byte) 0);
-		for (int i = 0; i < size; i++) {
-			data.put((byte) i);
-		}
-		data.flip();
-		fileChannel.write(data);
-	}
-
-	private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) {
-		assertEquals("wrong channel index", expectedChannelIndex, boe.getChannelIndex());
-		assertTrue("is not buffer", boe.isBuffer());
-		
-		Buffer buf = boe.getBuffer();
-		assertEquals("wrong buffer size", expectedSize, buf.getSize());
-		
-		MemorySegment seg = buf.getMemorySegment();
-		for (int i = 0; i < expectedSize; i++) {
-			assertEquals("wrong buffer contents", (byte) i, seg.get(i));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
deleted file mode 100644
index 45bbbda..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
-import org.apache.flink.types.LongValue;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.IOException;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-/**
- * This test uses the PowerMockRunner runner to work around the fact that the 
- * {@link ResultPartitionWriter} class is final.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
-public class StreamRecordWriterTest {
-
-	/**
-	 * Verifies that exceptions during flush from the output flush thread are
-	 * recognized in the writer.
-	 */
-	@Test
-	public void testPropagateAsyncFlushError() {
-		FailingWriter<LongValue> testWriter = null;
-		try {
-			ResultPartitionWriter mockResultPartitionWriter = getMockWriter(5);
-			
-			// test writer that flushes every 5ms and fails after 3 flushes
-			testWriter = new FailingWriter<LongValue>(mockResultPartitionWriter,
-					new RoundRobinChannelSelector<LongValue>(), 5, 3);
-			
-			try {
-				long deadline = System.currentTimeMillis() + 20000; // in max 20 seconds (conservative)
-				long l = 0L;
-				
-				while (System.currentTimeMillis() < deadline) {
-					testWriter.emit(new LongValue(l++));
-				}
-				
-				fail("This should have failed with an exception");
-			}
-			catch (IOException e) {
-				assertNotNull(e.getCause());
-				assertTrue(e.getCause().getMessage().contains("Test Exception"));
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-		finally {
-			if (testWriter != null) {
-				testWriter.close();
-			}
-		}
-	}
-	
-	private static ResultPartitionWriter getMockWriter(int numPartitions) throws Exception {
-		BufferProvider mockProvider = mock(BufferProvider.class);
-		when(mockProvider.requestBufferBlocking()).thenAnswer(new Answer<Buffer>() {
-			@Override
-			public Buffer answer(InvocationOnMock invocation) {
-				return new Buffer(
-						MemorySegmentFactory.allocateUnpooledSegment(4096),
-						FreeingBufferRecycler.INSTANCE);
-			}
-		});
-		
-		ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class);
-		when(mockWriter.getBufferProvider()).thenReturn(mockProvider);
-		when(mockWriter.getNumberOfOutputChannels()).thenReturn(numPartitions);
-		
-		
-		return mockWriter;
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	private static class FailingWriter<T extends IOReadableWritable> extends StreamRecordWriter<T> {
-		
-		private int flushesBeforeException;
-		
-		private FailingWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector,
-								long timeout, int flushesBeforeException) {
-			super(writer, channelSelector, timeout);
-			this.flushesBeforeException = flushesBeforeException;
-		}
-
-		@Override
-		public void flush() throws IOException {
-			if (flushesBeforeException-- <= 0) {
-				throw new IOException("Test Exception");
-			}
-			super.flush();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
deleted file mode 100644
index 286477a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.util.StringUtils;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-/**
- * A simple task event, used for validation of buffer or event blocking/buffering.
- */
-public class TestEvent extends AbstractEvent {
-
-	private long magicNumber;
-
-	private byte[] payload;
-
-	public TestEvent() {}
-
-	public TestEvent(long magicNumber, byte[] payload) {
-		this.magicNumber = magicNumber;
-		this.payload = payload;
-	}
-
-
-	// ------------------------------------------------------------------------
-	//  Serialization
-	// ------------------------------------------------------------------------
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeLong(magicNumber);
-		out.writeInt(payload.length);
-		out.write(payload);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.magicNumber = in.readLong();
-		this.payload = new byte[in.readInt()];
-		in.read(this.payload);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Standard utilities
-	// ------------------------------------------------------------------------
-
-	@Override
-	public int hashCode() {
-		return Long.valueOf(magicNumber).hashCode();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj != null && obj.getClass() == TestEvent.class) {
-			TestEvent that = (TestEvent) obj;
-			return this.magicNumber == that.magicNumber && Arrays.equals(this.payload, that.payload);
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return String.format("TestEvent %d (%s)", magicNumber, StringUtils.byteToHexString(payload));
-	}
-}
\ No newline at end of file