You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:45 UTC

[04/19] flink git commit: [streaming] Major internal renaming and restructure

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
deleted file mode 100644
index 24106c1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferIOTest.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.junit.Test;
-
-public class BarrierBufferIOTest {
-
-	@Test
-	public void IOTest() throws IOException, InterruptedException {
-
-		BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
-		BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
-
-		MockInputGate myIG = new MockInputGate(new BufferPool[] { pool1, pool2 },
-				new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
-		// new BarrierSimulator[] { new CountBarrier(1000), new
-		// CountBarrier(1000) });
-
-		BarrierBuffer barrierBuffer = new BarrierBuffer(myIG,
-				new BarrierBufferTest.MockReader(myIG));
-
-		try {
-			// long time = System.currentTimeMillis();
-			for (int i = 0; i < 2000000; i++) {
-				BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
-				if (boe.isBuffer()) {
-					boe.getBuffer().recycle();
-				} else {
-					barrierBuffer.processSuperstep(boe);
-				}
-			}
-			// System.out.println("Ran for " + (System.currentTimeMillis() -
-			// time));
-		} catch (Exception e) {
-			fail();
-		} finally {
-			barrierBuffer.cleanup();
-		}
-	}
-
-	private static class RandomBarrier implements BarrierGenerator {
-		private static Random rnd = new Random();
-
-		double threshold;
-
-		public RandomBarrier(double expectedEvery) {
-			threshold = 1 / expectedEvery;
-		}
-
-		@Override
-		public boolean isNextBarrier() {
-			return rnd.nextDouble() < threshold;
-		}
-	}
-
-	private static class CountBarrier implements BarrierGenerator {
-
-		long every;
-		long c = 0;
-
-		public CountBarrier(long every) {
-			this.every = every;
-		}
-
-		@Override
-		public boolean isNextBarrier() {
-			return c++ % every == 0;
-		}
-	}
-
-	protected static class MockInputGate implements InputGate {
-
-		private int numChannels;
-		private BufferPool[] bufferPools;
-		private int[] currentSupersteps;
-		BarrierGenerator[] barrierGens;
-		int currentChannel = 0;
-		long c = 0;
-
-		public MockInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
-			this.numChannels = bufferPools.length;
-			this.currentSupersteps = new int[numChannels];
-			this.bufferPools = bufferPools;
-			this.barrierGens = barrierGens;
-		}
-
-		@Override
-		public int getNumberOfInputChannels() {
-			return numChannels;
-		}
-
-		@Override
-		public boolean isFinished() {
-			return false;
-		}
-
-		@Override
-		public void requestPartitions() throws IOException, InterruptedException {
-		}
-
-		@Override
-		public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
-			currentChannel = (currentChannel + 1) % numChannels;
-
-			if (barrierGens[currentChannel].isNextBarrier()) {
-				return BarrierBufferTest.createSuperstep(++currentSupersteps[currentChannel],
-						currentChannel);
-			} else {
-				Buffer buffer = bufferPools[currentChannel].requestBuffer();
-				buffer.getMemorySegment().putLong(0, c++);
-
-				return new BufferOrEvent(buffer, currentChannel);
-			}
-
-		}
-
-		@Override
-		public void sendTaskEvent(TaskEvent event) throws IOException {
-		}
-
-		@Override
-		public void registerListener(EventListener<InputGate> listener) {
-		}
-
-	}
-
-	protected interface BarrierGenerator {
-		public boolean isNextBarrier();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
deleted file mode 100644
index 62feb35..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/BarrierBufferTest.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
-import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.streaming.api.streamvertex.StreamingSuperstep;
-import org.junit.Test;
-
-public class BarrierBufferTest {
-
-	@Test
-	public void testWithoutBarriers() throws IOException, InterruptedException {
-
-		List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
-		input.add(createBuffer(0));
-		input.add(createBuffer(0));
-		input.add(createBuffer(0));
-
-		InputGate mockIG = new MockInputGate(1, input);
-		AbstractReader mockAR = new MockReader(mockIG);
-
-		BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
-
-		assertEquals(input.get(0), bb.getNextNonBlocked());
-		assertEquals(input.get(1), bb.getNextNonBlocked());
-		assertEquals(input.get(2), bb.getNextNonBlocked());
-
-		bb.cleanup();
-	}
-
-	@Test
-	public void testOneChannelBarrier() throws IOException, InterruptedException {
-
-		List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
-		input.add(createBuffer(0));
-		input.add(createBuffer(0));
-		input.add(createSuperstep(1, 0));
-		input.add(createBuffer(0));
-		input.add(createBuffer(0));
-		input.add(createSuperstep(2, 0));
-		input.add(createBuffer(0));
-
-		InputGate mockIG = new MockInputGate(1, input);
-		AbstractReader mockAR = new MockReader(mockIG);
-
-		BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
-		BufferOrEvent nextBoe;
-
-		assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
-		assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
-		assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
-		assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
-		assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
-		assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
-		assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
-
-		bb.cleanup();
-	}
-
-	@Test
-	public void testMultiChannelBarrier() throws IOException, InterruptedException {
-
-		List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
-		input.add(createBuffer(0));
-		input.add(createBuffer(1));
-		input.add(createSuperstep(1, 0));
-		input.add(createSuperstep(2, 0));
-		input.add(createBuffer(0));
-		input.add(createSuperstep(3, 0));
-		input.add(createBuffer(0));
-		input.add(createBuffer(1));
-		input.add(createSuperstep(1, 1));
-		input.add(createBuffer(0));
-		input.add(createBuffer(1));
-		input.add(createSuperstep(2, 1));
-		input.add(createSuperstep(3, 1));
-		input.add(createSuperstep(4, 0));
-		input.add(createBuffer(0));
-		input.add(new BufferOrEvent(new EndOfPartitionEvent(), 1));
-		
-
-		InputGate mockIG1 = new MockInputGate(2, input);
-		AbstractReader mockAR1 = new MockReader(mockIG1);
-
-		BarrierBuffer bb = new BarrierBuffer(mockIG1, mockAR1);
-		BufferOrEvent nextBoe;
-
-		check(input.get(0), nextBoe = bb.getNextNonBlocked());
-		check(input.get(1), nextBoe = bb.getNextNonBlocked());
-		check(input.get(2), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
-		check(input.get(7), nextBoe = bb.getNextNonBlocked());
-		check(input.get(8), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
-		check(input.get(3), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
-		check(input.get(10), nextBoe = bb.getNextNonBlocked());
-		check(input.get(11), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
-		check(input.get(4), nextBoe = bb.getNextNonBlocked());
-		check(input.get(5), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
-		check(input.get(12), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
-		check(input.get(6), nextBoe = bb.getNextNonBlocked());
-		check(input.get(9), nextBoe = bb.getNextNonBlocked());
-		check(input.get(13), nextBoe = bb.getNextNonBlocked());
-		bb.processSuperstep(nextBoe);
-		check(input.get(14), nextBoe = bb.getNextNonBlocked());
-		check(input.get(15), nextBoe = bb.getNextNonBlocked());
-
-		bb.cleanup();
-	}
-
-	private static void check(BufferOrEvent expected, BufferOrEvent actual) {
-		assertEquals(expected.isBuffer(), actual.isBuffer());
-		assertEquals(expected.getChannelIndex(), actual.getChannelIndex());
-		if (expected.isEvent()) {
-			assertEquals(expected.getEvent(), actual.getEvent());
-		}
-	}
-
-	protected static class MockInputGate implements InputGate {
-
-		private int numChannels;
-		private Queue<BufferOrEvent> boes;
-
-		public MockInputGate(int numChannels, List<BufferOrEvent> boes) {
-			this.numChannels = numChannels;
-			this.boes = new LinkedList<BufferOrEvent>(boes);
-		}
-
-		@Override
-		public int getNumberOfInputChannels() {
-			return numChannels;
-		}
-
-		@Override
-		public boolean isFinished() {
-			return boes.isEmpty();
-		}
-
-		@Override
-		public void requestPartitions() throws IOException, InterruptedException {
-		}
-
-		@Override
-		public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
-			return boes.remove();
-		}
-
-		@Override
-		public void sendTaskEvent(TaskEvent event) throws IOException {
-		}
-
-		@Override
-		public void registerListener(EventListener<InputGate> listener) {
-		}
-
-	}
-
-	protected static class MockReader extends AbstractReader {
-
-		protected MockReader(InputGate inputGate) {
-			super(inputGate);
-		}
-
-	}
-
-	protected static BufferOrEvent createSuperstep(long id, int channel) {
-		return new BufferOrEvent(new StreamingSuperstep(id), channel);
-	}
-
-	protected static BufferOrEvent createBuffer(int channel) {
-		return new BufferOrEvent(new Buffer(new MemorySegment(new byte[] { 1 }),
-				new BufferRecycler() {
-
-					@Override
-					public void recycle(MemorySegment memorySegment) {
-					}
-				}), channel);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
deleted file mode 100644
index 1e57d14..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/CoRecordReaderTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.io.BarrierBufferTest.MockInputGate;
-import org.junit.Test;
-
-public class CoRecordReaderTest {
-
-	@Test
-	public void test() throws InterruptedException, IOException {
-
-		List<BufferOrEvent> input1 = new LinkedList<BufferOrEvent>();
-		input1.add(BarrierBufferTest.createBuffer(0));
-		input1.add(BarrierBufferTest.createSuperstep(1, 0));
-		input1.add(BarrierBufferTest.createBuffer(0));
-
-		InputGate ig1 = new MockInputGate(1, input1);
-
-		List<BufferOrEvent> input2 = new LinkedList<BufferOrEvent>();
-		input2.add(BarrierBufferTest.createBuffer(0));
-		input2.add(BarrierBufferTest.createBuffer(0));
-		input2.add(BarrierBufferTest.createSuperstep(1, 0));
-		input2.add(BarrierBufferTest.createBuffer(0));
-
-		InputGate ig2 = new MockInputGate(1, input2);
-
-		CoRecordReader<?, ?> coReader = new CoRecordReader<IOReadableWritable, IOReadableWritable>(
-				ig1, ig2);
-		BarrierBuffer b1 = coReader.barrierBuffer1;
-		BarrierBuffer b2 = coReader.barrierBuffer2;
-
-		coReader.addToAvailable(ig1);
-		coReader.addToAvailable(ig2);
-		coReader.addToAvailable(ig2);
-		coReader.addToAvailable(ig1);
-
-		assertEquals(1, coReader.getNextReaderIndexBlocking());
-		b1.getNextNonBlocked();
-
-		assertEquals(2, coReader.getNextReaderIndexBlocking());
-		b2.getNextNonBlocked();
-
-		assertEquals(2, coReader.getNextReaderIndexBlocking());
-		b2.getNextNonBlocked();
-
-		assertEquals(1, coReader.getNextReaderIndexBlocking());
-		b1.getNextNonBlocked();
-		b1.processSuperstep(input1.get(1));
-
-		coReader.addToAvailable(ig1);
-		coReader.addToAvailable(ig2);
-		coReader.addToAvailable(ig2);
-
-		assertEquals(2, coReader.getNextReaderIndexBlocking());
-		b2.getNextNonBlocked();
-		b2.processSuperstep(input2.get(2));
-
-		assertEquals(1, coReader.getNextReaderIndexBlocking());
-		b1.getNextNonBlocked();
-
-		assertEquals(2, coReader.getNextReaderIndexBlocking());
-		b2.getNextNonBlocked();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java
deleted file mode 100644
index 30fc820..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/io/SpillingBufferOrEventTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.io;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.junit.Test;
-
-public class SpillingBufferOrEventTest {
-
-	@Test
-	public void testSpilling() throws IOException, InterruptedException {
-		BufferSpiller bsp = new BufferSpiller();
-		SpillReader spr = new SpillReader();
-
-		BufferPool pool1 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
-		BufferPool pool2 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
-
-		Buffer b1 = pool1.requestBuffer();
-		b1.getMemorySegment().putInt(0, 10000);
-		BufferOrEvent boe1 = new BufferOrEvent(b1, 2);
-		SpillingBufferOrEvent sboe1 = new SpillingBufferOrEvent(boe1, bsp, spr);
-
-		assertTrue(sboe1.isSpilled());
-
-		Buffer b2 = pool2.requestBuffer();
-		b2.getMemorySegment().putInt(0, 10000);
-		BufferOrEvent boe2 = new BufferOrEvent(b2, 4);
-		SpillingBufferOrEvent sboe2 = new SpillingBufferOrEvent(boe2, bsp, spr);
-
-		assertTrue(sboe2.isSpilled());
-
-		Buffer b3 = pool1.requestBuffer();
-		b3.getMemorySegment().putInt(0, 50000);
-		BufferOrEvent boe3 = new BufferOrEvent(b3, 0);
-		SpillingBufferOrEvent sboe3 = new SpillingBufferOrEvent(boe3, bsp, spr);
-
-		assertTrue(sboe3.isSpilled());
-
-		Buffer b4 = pool2.requestBuffer();
-		b4.getMemorySegment().putInt(0, 60000);
-		BufferOrEvent boe4 = new BufferOrEvent(b4, 0);
-		SpillingBufferOrEvent sboe4 = new SpillingBufferOrEvent(boe4, bsp, spr);
-
-		assertTrue(sboe4.isSpilled());
-
-		bsp.close();
-
-		spr.setSpillFile(bsp.getSpillFile());
-
-		Buffer b1ret = sboe1.getBufferOrEvent().getBuffer();
-		assertEquals(10000, b1ret.getMemorySegment().getInt(0));
-		assertEquals(2, sboe1.getBufferOrEvent().getChannelIndex());
-		b1ret.recycle();
-
-		Buffer b2ret = sboe2.getBufferOrEvent().getBuffer();
-		assertEquals(10000, b2ret.getMemorySegment().getInt(0));
-		assertEquals(4, sboe2.getBufferOrEvent().getChannelIndex());
-		b2ret.recycle();
-
-		Buffer b3ret = sboe3.getBufferOrEvent().getBuffer();
-		assertEquals(50000, b3ret.getMemorySegment().getInt(0));
-		assertEquals(0, sboe3.getBufferOrEvent().getChannelIndex());
-		b3ret.recycle();
-
-		Buffer b4ret = sboe4.getBufferOrEvent().getBuffer();
-		assertEquals(60000, b4ret.getMemorySegment().getInt(0));
-		b4ret.recycle();
-
-		spr.close();
-		bsp.getSpillFile().delete();
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
deleted file mode 100644
index de6c200..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/BroadcastPartitionerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class BroadcastPartitionerTest {
-
-	private BroadcastPartitioner<Tuple> broadcastPartitioner1;
-	private BroadcastPartitioner<Tuple> broadcastPartitioner2;
-	private BroadcastPartitioner<Tuple> broadcastPartitioner3;
-	
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
-	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(null);
-
-	@Before
-	public void setPartitioner() {
-		broadcastPartitioner1 = new BroadcastPartitioner<Tuple>();
-		broadcastPartitioner2 = new BroadcastPartitioner<Tuple>();
-		broadcastPartitioner3 = new BroadcastPartitioner<Tuple>();
-
-	}
-
-	@Test
-	public void testSelectChannels() {
-		int[] first = new int[] { 0 };
-		int[] second = new int[] { 0, 1 };
-		int[] sixth = new int[] { 0, 1, 2, 3, 4, 5 };
-		sd.setInstance(streamRecord);
-		assertArrayEquals(first, broadcastPartitioner1.selectChannels(sd, 1));
-		assertArrayEquals(second, broadcastPartitioner2.selectChannels(sd, 2));
-		assertArrayEquals(sixth, broadcastPartitioner3.selectChannels(sd, 6));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
deleted file mode 100644
index 0675242..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/DistributePartitionerTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class DistributePartitionerTest {
-	
-	private DistributePartitioner<Tuple> distributePartitioner;
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
-	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
-			null);
-	
-	@Before
-	public void setPartitioner() {
-		distributePartitioner = new DistributePartitioner<Tuple>(false);
-	}
-	
-	@Test
-	public void testSelectChannelsLength() {
-		sd.setInstance(streamRecord);
-		assertEquals(1, distributePartitioner.selectChannels(sd, 1).length);
-		assertEquals(1, distributePartitioner.selectChannels(sd, 2).length);
-		assertEquals(1, distributePartitioner.selectChannels(sd, 1024).length);
-	}
-	
-	@Test
-	public void testSelectChannelsInterval() {
-		sd.setInstance(streamRecord);
-		assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
-		assertEquals(1, distributePartitioner.selectChannels(sd, 3)[0]);
-		assertEquals(2, distributePartitioner.selectChannels(sd, 3)[0]);
-		assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
deleted file mode 100644
index b56649b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/FieldsPartitionerTest.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FieldsPartitionerTest {
-
-	private FieldsPartitioner<Tuple> fieldsPartitioner;
-	private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>()
-			.setObject(new Tuple2<String, Integer>("test", 0));
-	private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>()
-			.setObject(new Tuple2<String, Integer>("test", 42));
-	private SerializationDelegate<StreamRecord<Tuple>> sd1 = new SerializationDelegate<StreamRecord<Tuple>>(
-			null);
-	private SerializationDelegate<StreamRecord<Tuple>> sd2 = new SerializationDelegate<StreamRecord<Tuple>>(
-			null);
-
-	@Before
-	public void setPartitioner() {
-		fieldsPartitioner = new FieldsPartitioner<Tuple>(new KeySelector<Tuple, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String getKey(Tuple value) throws Exception {
-				return value.getField(0);
-			}
-		});
-	}
-
-	@Test
-	public void testSelectChannelsLength() {
-		sd1.setInstance(streamRecord1);
-		assertEquals(1, fieldsPartitioner.selectChannels(sd1, 1).length);
-		assertEquals(1, fieldsPartitioner.selectChannels(sd1, 2).length);
-		assertEquals(1, fieldsPartitioner.selectChannels(sd1, 1024).length);
-	}
-
-	@Test
-	public void testSelectChannelsGrouping() {
-		sd1.setInstance(streamRecord1);
-		sd2.setInstance(streamRecord2);
-
-		assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1),
-				fieldsPartitioner.selectChannels(sd2, 1));
-		assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 2),
-				fieldsPartitioner.selectChannels(sd2, 2));
-		assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1024),
-				fieldsPartitioner.selectChannels(sd2, 1024));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
deleted file mode 100644
index b381d85..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ForwardPartitionerTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.*;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ForwardPartitionerTest {
-
-	private DistributePartitioner<Tuple> forwardPartitioner;
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
-	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
-			null);
-
-	@Before
-	public void setPartitioner() {
-		forwardPartitioner = new DistributePartitioner<Tuple>(true);
-	}
-
-	@Test
-	public void testSelectChannelsLength() {
-		sd.setInstance(streamRecord);
-		assertEquals(1, forwardPartitioner.selectChannels(sd, 1).length);
-		assertEquals(1, forwardPartitioner.selectChannels(sd, 2).length);
-		assertEquals(1, forwardPartitioner.selectChannels(sd, 1024).length);
-	}
-
-	@Test
-	public void testSelectChannelsInterval() {
-		sd.setInstance(streamRecord);
-		assertEquals(0, forwardPartitioner.selectChannels(sd, 1)[0]);
-		assertEquals(1, forwardPartitioner.selectChannels(sd, 2)[0]);
-		assertEquals(2, forwardPartitioner.selectChannels(sd, 1024)[0]);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
deleted file mode 100644
index eebda64..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/GlobalPartitionerTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class GlobalPartitionerTest {
-
-	private GlobalPartitioner<Tuple> globalPartitioner;
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
-	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
-			null);
-
-	@Before
-	public void setPartitioner() {
-		globalPartitioner = new GlobalPartitioner<Tuple>();
-	}
-
-	@Test
-	public void testSelectChannels() {
-		int[] result = new int[] { 0 };
-
-		sd.setInstance(streamRecord);
-
-		assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1));
-		assertArrayEquals(result, globalPartitioner.selectChannels(sd, 2));
-		assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1024));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
deleted file mode 100644
index 3c03d07..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/partitioner/ShufflePartitionerTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.partitioner;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ShufflePartitionerTest {
-
-	private ShufflePartitioner<Tuple> shufflePartitioner;
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
-	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
-			null);
-
-	@Before
-	public void setPartitioner() {
-		shufflePartitioner = new ShufflePartitioner<Tuple>();
-	}
-
-	@Test
-	public void testSelectChannelsLength() {
-		sd.setInstance(streamRecord);
-		assertEquals(1, shufflePartitioner.selectChannels(sd, 1).length);
-		assertEquals(1, shufflePartitioner.selectChannels(sd, 2).length);
-		assertEquals(1, shufflePartitioner.selectChannels(sd, 1024).length);
-	}
-
-	@Test
-	public void testSelectChannelsInterval() {
-		sd.setInstance(streamRecord);
-		assertEquals(0, shufflePartitioner.selectChannels(sd, 1)[0]);
-
-		assertTrue(0 <= shufflePartitioner.selectChannels(sd, 2)[0]);
-		assertTrue(2 > shufflePartitioner.selectChannels(sd, 2)[0]);
-
-		assertTrue(0 <= shufflePartitioner.selectChannels(sd, 1024)[0]);
-		assertTrue(1024 > shufflePartitioner.selectChannels(sd, 1024)[0]);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
new file mode 100644
index 0000000..3f8401d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferIOTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.io.BarrierBuffer;
+import org.junit.Test;
+
+public class BarrierBufferIOTest {
+
+	@Test
+	public void IOTest() throws IOException, InterruptedException {
+
+		BufferPool pool1 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+		BufferPool pool2 = new NetworkBufferPool(100, 1024).createBufferPool(100, true);
+
+		MockInputGate myIG = new MockInputGate(new BufferPool[] { pool1, pool2 },
+				new BarrierGenerator[] { new CountBarrier(100000), new RandomBarrier(100000) });
+		// new BarrierSimulator[] { new CountBarrier(1000), new
+		// CountBarrier(1000) });
+
+		BarrierBuffer barrierBuffer = new BarrierBuffer(myIG,
+				new BarrierBufferTest.MockReader(myIG));
+
+		try {
+			// long time = System.currentTimeMillis();
+			for (int i = 0; i < 2000000; i++) {
+				BufferOrEvent boe = barrierBuffer.getNextNonBlocked();
+				if (boe.isBuffer()) {
+					boe.getBuffer().recycle();
+				} else {
+					barrierBuffer.processSuperstep(boe);
+				}
+			}
+			// System.out.println("Ran for " + (System.currentTimeMillis() -
+			// time));
+		} catch (Exception e) {
+			fail();
+		} finally {
+			barrierBuffer.cleanup();
+		}
+	}
+
+	private static class RandomBarrier implements BarrierGenerator {
+		private static Random rnd = new Random();
+
+		double threshold;
+
+		public RandomBarrier(double expectedEvery) {
+			threshold = 1 / expectedEvery;
+		}
+
+		@Override
+		public boolean isNextBarrier() {
+			return rnd.nextDouble() < threshold;
+		}
+	}
+
+	private static class CountBarrier implements BarrierGenerator {
+
+		long every;
+		long c = 0;
+
+		public CountBarrier(long every) {
+			this.every = every;
+		}
+
+		@Override
+		public boolean isNextBarrier() {
+			return c++ % every == 0;
+		}
+	}
+
+	protected static class MockInputGate implements InputGate {
+
+		private int numChannels;
+		private BufferPool[] bufferPools;
+		private int[] currentSupersteps;
+		BarrierGenerator[] barrierGens;
+		int currentChannel = 0;
+		long c = 0;
+
+		public MockInputGate(BufferPool[] bufferPools, BarrierGenerator[] barrierGens) {
+			this.numChannels = bufferPools.length;
+			this.currentSupersteps = new int[numChannels];
+			this.bufferPools = bufferPools;
+			this.barrierGens = barrierGens;
+		}
+
+		@Override
+		public int getNumberOfInputChannels() {
+			return numChannels;
+		}
+
+		@Override
+		public boolean isFinished() {
+			return false;
+		}
+
+		@Override
+		public void requestPartitions() throws IOException, InterruptedException {
+		}
+
+		@Override
+		public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+			currentChannel = (currentChannel + 1) % numChannels;
+
+			if (barrierGens[currentChannel].isNextBarrier()) {
+				return BarrierBufferTest.createSuperstep(++currentSupersteps[currentChannel],
+						currentChannel);
+			} else {
+				Buffer buffer = bufferPools[currentChannel].requestBuffer();
+				buffer.getMemorySegment().putLong(0, c++);
+
+				return new BufferOrEvent(buffer, currentChannel);
+			}
+
+		}
+
+		@Override
+		public void sendTaskEvent(TaskEvent event) throws IOException {
+		}
+
+		@Override
+		public void registerListener(EventListener<InputGate> listener) {
+		}
+
+	}
+
+	protected interface BarrierGenerator {
+		public boolean isNextBarrier();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
new file mode 100644
index 0000000..b8af4ed
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.runtime.io.BarrierBuffer;
+import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
+import org.junit.Test;
+
+public class BarrierBufferTest {
+
+	@Test
+	public void testWithoutBarriers() throws IOException, InterruptedException {
+
+		List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+		input.add(createBuffer(0));
+		input.add(createBuffer(0));
+		input.add(createBuffer(0));
+
+		InputGate mockIG = new MockInputGate(1, input);
+		AbstractReader mockAR = new MockReader(mockIG);
+
+		BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
+
+		assertEquals(input.get(0), bb.getNextNonBlocked());
+		assertEquals(input.get(1), bb.getNextNonBlocked());
+		assertEquals(input.get(2), bb.getNextNonBlocked());
+
+		bb.cleanup();
+	}
+
+	@Test
+	public void testOneChannelBarrier() throws IOException, InterruptedException {
+
+		List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+		input.add(createBuffer(0));
+		input.add(createBuffer(0));
+		input.add(createSuperstep(1, 0));
+		input.add(createBuffer(0));
+		input.add(createBuffer(0));
+		input.add(createSuperstep(2, 0));
+		input.add(createBuffer(0));
+
+		InputGate mockIG = new MockInputGate(1, input);
+		AbstractReader mockAR = new MockReader(mockIG);
+
+		BarrierBuffer bb = new BarrierBuffer(mockIG, mockAR);
+		BufferOrEvent nextBoe;
+
+		assertEquals(input.get(0), nextBoe = bb.getNextNonBlocked());
+		assertEquals(input.get(1), nextBoe = bb.getNextNonBlocked());
+		assertEquals(input.get(2), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		assertEquals(input.get(3), nextBoe = bb.getNextNonBlocked());
+		assertEquals(input.get(4), nextBoe = bb.getNextNonBlocked());
+		assertEquals(input.get(5), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		assertEquals(input.get(6), nextBoe = bb.getNextNonBlocked());
+
+		bb.cleanup();
+	}
+
+	@Test
+	public void testMultiChannelBarrier() throws IOException, InterruptedException {
+
+		List<BufferOrEvent> input = new LinkedList<BufferOrEvent>();
+		input.add(createBuffer(0));
+		input.add(createBuffer(1));
+		input.add(createSuperstep(1, 0));
+		input.add(createSuperstep(2, 0));
+		input.add(createBuffer(0));
+		input.add(createSuperstep(3, 0));
+		input.add(createBuffer(0));
+		input.add(createBuffer(1));
+		input.add(createSuperstep(1, 1));
+		input.add(createBuffer(0));
+		input.add(createBuffer(1));
+		input.add(createSuperstep(2, 1));
+		input.add(createSuperstep(3, 1));
+		input.add(createSuperstep(4, 0));
+		input.add(createBuffer(0));
+		input.add(new BufferOrEvent(new EndOfPartitionEvent(), 1));
+		
+
+		InputGate mockIG1 = new MockInputGate(2, input);
+		AbstractReader mockAR1 = new MockReader(mockIG1);
+
+		BarrierBuffer bb = new BarrierBuffer(mockIG1, mockAR1);
+		BufferOrEvent nextBoe;
+
+		check(input.get(0), nextBoe = bb.getNextNonBlocked());
+		check(input.get(1), nextBoe = bb.getNextNonBlocked());
+		check(input.get(2), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		check(input.get(7), nextBoe = bb.getNextNonBlocked());
+		check(input.get(8), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		check(input.get(3), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		check(input.get(10), nextBoe = bb.getNextNonBlocked());
+		check(input.get(11), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		check(input.get(4), nextBoe = bb.getNextNonBlocked());
+		check(input.get(5), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		check(input.get(12), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		check(input.get(6), nextBoe = bb.getNextNonBlocked());
+		check(input.get(9), nextBoe = bb.getNextNonBlocked());
+		check(input.get(13), nextBoe = bb.getNextNonBlocked());
+		bb.processSuperstep(nextBoe);
+		check(input.get(14), nextBoe = bb.getNextNonBlocked());
+		check(input.get(15), nextBoe = bb.getNextNonBlocked());
+
+		bb.cleanup();
+	}
+
+	private static void check(BufferOrEvent expected, BufferOrEvent actual) {
+		assertEquals(expected.isBuffer(), actual.isBuffer());
+		assertEquals(expected.getChannelIndex(), actual.getChannelIndex());
+		if (expected.isEvent()) {
+			assertEquals(expected.getEvent(), actual.getEvent());
+		}
+	}
+
+	protected static class MockInputGate implements InputGate {
+
+		private int numChannels;
+		private Queue<BufferOrEvent> boes;
+
+		public MockInputGate(int numChannels, List<BufferOrEvent> boes) {
+			this.numChannels = numChannels;
+			this.boes = new LinkedList<BufferOrEvent>(boes);
+		}
+
+		@Override
+		public int getNumberOfInputChannels() {
+			return numChannels;
+		}
+
+		@Override
+		public boolean isFinished() {
+			return boes.isEmpty();
+		}
+
+		@Override
+		public void requestPartitions() throws IOException, InterruptedException {
+		}
+
+		@Override
+		public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
+			return boes.remove();
+		}
+
+		@Override
+		public void sendTaskEvent(TaskEvent event) throws IOException {
+		}
+
+		@Override
+		public void registerListener(EventListener<InputGate> listener) {
+		}
+
+	}
+
+	protected static class MockReader extends AbstractReader {
+
+		protected MockReader(InputGate inputGate) {
+			super(inputGate);
+		}
+
+	}
+
+	protected static BufferOrEvent createSuperstep(long id, int channel) {
+		return new BufferOrEvent(new StreamingSuperstep(id), channel);
+	}
+
+	protected static BufferOrEvent createBuffer(int channel) {
+		return new BufferOrEvent(new Buffer(new MemorySegment(new byte[] { 1 }),
+				new BufferRecycler() {
+
+					@Override
+					public void recycle(MemorySegment memorySegment) {
+					}
+				}), channel);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
new file mode 100644
index 0000000..528829d
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/CoRecordReaderTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.runtime.io.BarrierBuffer;
+import org.apache.flink.streaming.runtime.io.CoRecordReader;
+import org.apache.flink.streaming.runtime.io.BarrierBufferTest.MockInputGate;
+import org.junit.Test;
+
+public class CoRecordReaderTest {
+
+	@Test
+	public void test() throws InterruptedException, IOException {
+
+		List<BufferOrEvent> input1 = new LinkedList<BufferOrEvent>();
+		input1.add(BarrierBufferTest.createBuffer(0));
+		input1.add(BarrierBufferTest.createSuperstep(1, 0));
+		input1.add(BarrierBufferTest.createBuffer(0));
+
+		InputGate ig1 = new MockInputGate(1, input1);
+
+		List<BufferOrEvent> input2 = new LinkedList<BufferOrEvent>();
+		input2.add(BarrierBufferTest.createBuffer(0));
+		input2.add(BarrierBufferTest.createBuffer(0));
+		input2.add(BarrierBufferTest.createSuperstep(1, 0));
+		input2.add(BarrierBufferTest.createBuffer(0));
+
+		InputGate ig2 = new MockInputGate(1, input2);
+
+		CoRecordReader<?, ?> coReader = new CoRecordReader<IOReadableWritable, IOReadableWritable>(
+				ig1, ig2);
+		BarrierBuffer b1 = coReader.barrierBuffer1;
+		BarrierBuffer b2 = coReader.barrierBuffer2;
+
+		coReader.addToAvailable(ig1);
+		coReader.addToAvailable(ig2);
+		coReader.addToAvailable(ig2);
+		coReader.addToAvailable(ig1);
+
+		assertEquals(1, coReader.getNextReaderIndexBlocking());
+		b1.getNextNonBlocked();
+
+		assertEquals(2, coReader.getNextReaderIndexBlocking());
+		b2.getNextNonBlocked();
+
+		assertEquals(2, coReader.getNextReaderIndexBlocking());
+		b2.getNextNonBlocked();
+
+		assertEquals(1, coReader.getNextReaderIndexBlocking());
+		b1.getNextNonBlocked();
+		b1.processSuperstep(input1.get(1));
+
+		coReader.addToAvailable(ig1);
+		coReader.addToAvailable(ig2);
+		coReader.addToAvailable(ig2);
+
+		assertEquals(2, coReader.getNextReaderIndexBlocking());
+		b2.getNextNonBlocked();
+		b2.processSuperstep(input2.get(2));
+
+		assertEquals(1, coReader.getNextReaderIndexBlocking());
+		b1.getNextNonBlocked();
+
+		assertEquals(2, coReader.getNextReaderIndexBlocking());
+		b2.getNextNonBlocked();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
new file mode 100644
index 0000000..e0fab17
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/io/SpillingBufferOrEventTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.streaming.runtime.io.BufferSpiller;
+import org.apache.flink.streaming.runtime.io.SpillReader;
+import org.apache.flink.streaming.runtime.io.SpillingBufferOrEvent;
+import org.junit.Test;
+
+public class SpillingBufferOrEventTest {
+
+	@Test
+	public void testSpilling() throws IOException, InterruptedException {
+		BufferSpiller bsp = new BufferSpiller();
+		SpillReader spr = new SpillReader();
+
+		BufferPool pool1 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
+		BufferPool pool2 = new NetworkBufferPool(10, 256).createBufferPool(2, true);
+
+		Buffer b1 = pool1.requestBuffer();
+		b1.getMemorySegment().putInt(0, 10000);
+		BufferOrEvent boe1 = new BufferOrEvent(b1, 2);
+		SpillingBufferOrEvent sboe1 = new SpillingBufferOrEvent(boe1, bsp, spr);
+
+		assertTrue(sboe1.isSpilled());
+
+		Buffer b2 = pool2.requestBuffer();
+		b2.getMemorySegment().putInt(0, 10000);
+		BufferOrEvent boe2 = new BufferOrEvent(b2, 4);
+		SpillingBufferOrEvent sboe2 = new SpillingBufferOrEvent(boe2, bsp, spr);
+
+		assertTrue(sboe2.isSpilled());
+
+		Buffer b3 = pool1.requestBuffer();
+		b3.getMemorySegment().putInt(0, 50000);
+		BufferOrEvent boe3 = new BufferOrEvent(b3, 0);
+		SpillingBufferOrEvent sboe3 = new SpillingBufferOrEvent(boe3, bsp, spr);
+
+		assertTrue(sboe3.isSpilled());
+
+		Buffer b4 = pool2.requestBuffer();
+		b4.getMemorySegment().putInt(0, 60000);
+		BufferOrEvent boe4 = new BufferOrEvent(b4, 0);
+		SpillingBufferOrEvent sboe4 = new SpillingBufferOrEvent(boe4, bsp, spr);
+
+		assertTrue(sboe4.isSpilled());
+
+		bsp.close();
+
+		spr.setSpillFile(bsp.getSpillFile());
+
+		Buffer b1ret = sboe1.getBufferOrEvent().getBuffer();
+		assertEquals(10000, b1ret.getMemorySegment().getInt(0));
+		assertEquals(2, sboe1.getBufferOrEvent().getChannelIndex());
+		b1ret.recycle();
+
+		Buffer b2ret = sboe2.getBufferOrEvent().getBuffer();
+		assertEquals(10000, b2ret.getMemorySegment().getInt(0));
+		assertEquals(4, sboe2.getBufferOrEvent().getChannelIndex());
+		b2ret.recycle();
+
+		Buffer b3ret = sboe3.getBufferOrEvent().getBuffer();
+		assertEquals(50000, b3ret.getMemorySegment().getInt(0));
+		assertEquals(0, sboe3.getBufferOrEvent().getChannelIndex());
+		b3ret.recycle();
+
+		Buffer b4ret = sboe4.getBufferOrEvent().getBuffer();
+		assertEquals(60000, b4ret.getMemorySegment().getInt(0));
+		b4ret.recycle();
+
+		spr.close();
+		bsp.getSpillFile().delete();
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
new file mode 100644
index 0000000..aa4d24a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/BroadcastPartitionerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BroadcastPartitionerTest {
+
+	private BroadcastPartitioner<Tuple> broadcastPartitioner1;
+	private BroadcastPartitioner<Tuple> broadcastPartitioner2;
+	private BroadcastPartitioner<Tuple> broadcastPartitioner3;
+	
+	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(null);
+
+	@Before
+	public void setPartitioner() {
+		broadcastPartitioner1 = new BroadcastPartitioner<Tuple>();
+		broadcastPartitioner2 = new BroadcastPartitioner<Tuple>();
+		broadcastPartitioner3 = new BroadcastPartitioner<Tuple>();
+
+	}
+
+	@Test
+	public void testSelectChannels() {
+		int[] first = new int[] { 0 };
+		int[] second = new int[] { 0, 1 };
+		int[] sixth = new int[] { 0, 1, 2, 3, 4, 5 };
+		sd.setInstance(streamRecord);
+		assertArrayEquals(first, broadcastPartitioner1.selectChannels(sd, 1));
+		assertArrayEquals(second, broadcastPartitioner2.selectChannels(sd, 2));
+		assertArrayEquals(sixth, broadcastPartitioner3.selectChannels(sd, 6));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
new file mode 100644
index 0000000..37638cf
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/DistributePartitionerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DistributePartitionerTest {
+	
+	private DistributePartitioner<Tuple> distributePartitioner;
+	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
+			null);
+	
+	@Before
+	public void setPartitioner() {
+		distributePartitioner = new DistributePartitioner<Tuple>(false);
+	}
+	
+	@Test
+	public void testSelectChannelsLength() {
+		sd.setInstance(streamRecord);
+		assertEquals(1, distributePartitioner.selectChannels(sd, 1).length);
+		assertEquals(1, distributePartitioner.selectChannels(sd, 2).length);
+		assertEquals(1, distributePartitioner.selectChannels(sd, 1024).length);
+	}
+	
+	@Test
+	public void testSelectChannelsInterval() {
+		sd.setInstance(streamRecord);
+		assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
+		assertEquals(1, distributePartitioner.selectChannels(sd, 3)[0]);
+		assertEquals(2, distributePartitioner.selectChannels(sd, 3)[0]);
+		assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
new file mode 100644
index 0000000..94d29ac
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/FieldsPartitionerTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FieldsPartitionerTest {
+
+	private FieldsPartitioner<Tuple> fieldsPartitioner;
+	private StreamRecord<Tuple> streamRecord1 = new StreamRecord<Tuple>()
+			.setObject(new Tuple2<String, Integer>("test", 0));
+	private StreamRecord<Tuple> streamRecord2 = new StreamRecord<Tuple>()
+			.setObject(new Tuple2<String, Integer>("test", 42));
+	private SerializationDelegate<StreamRecord<Tuple>> sd1 = new SerializationDelegate<StreamRecord<Tuple>>(
+			null);
+	private SerializationDelegate<StreamRecord<Tuple>> sd2 = new SerializationDelegate<StreamRecord<Tuple>>(
+			null);
+
+	@Before
+	public void setPartitioner() {
+		fieldsPartitioner = new FieldsPartitioner<Tuple>(new KeySelector<Tuple, String>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public String getKey(Tuple value) throws Exception {
+				return value.getField(0);
+			}
+		});
+	}
+
+	@Test
+	public void testSelectChannelsLength() {
+		sd1.setInstance(streamRecord1);
+		assertEquals(1, fieldsPartitioner.selectChannels(sd1, 1).length);
+		assertEquals(1, fieldsPartitioner.selectChannels(sd1, 2).length);
+		assertEquals(1, fieldsPartitioner.selectChannels(sd1, 1024).length);
+	}
+
+	@Test
+	public void testSelectChannelsGrouping() {
+		sd1.setInstance(streamRecord1);
+		sd2.setInstance(streamRecord2);
+
+		assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1),
+				fieldsPartitioner.selectChannels(sd2, 1));
+		assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 2),
+				fieldsPartitioner.selectChannels(sd2, 2));
+		assertArrayEquals(fieldsPartitioner.selectChannels(sd1, 1024),
+				fieldsPartitioner.selectChannels(sd2, 1024));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
new file mode 100644
index 0000000..c0d39da
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import static org.junit.Assert.*;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.DistributePartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ForwardPartitionerTest {
+
+	private DistributePartitioner<Tuple> forwardPartitioner;
+	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
+			null);
+
+	@Before
+	public void setPartitioner() {
+		forwardPartitioner = new DistributePartitioner<Tuple>(true);
+	}
+
+	@Test
+	public void testSelectChannelsLength() {
+		sd.setInstance(streamRecord);
+		assertEquals(1, forwardPartitioner.selectChannels(sd, 1).length);
+		assertEquals(1, forwardPartitioner.selectChannels(sd, 2).length);
+		assertEquals(1, forwardPartitioner.selectChannels(sd, 1024).length);
+	}
+
+	@Test
+	public void testSelectChannelsInterval() {
+		sd.setInstance(streamRecord);
+		assertEquals(0, forwardPartitioner.selectChannels(sd, 1)[0]);
+		assertEquals(1, forwardPartitioner.selectChannels(sd, 2)[0]);
+		assertEquals(2, forwardPartitioner.selectChannels(sd, 1024)[0]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
new file mode 100644
index 0000000..69c00cd
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+public class GlobalPartitionerTest {
+
+	private GlobalPartitioner<Tuple> globalPartitioner;
+	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
+			null);
+
+	@Before
+	public void setPartitioner() {
+		globalPartitioner = new GlobalPartitioner<Tuple>();
+	}
+
+	@Test
+	public void testSelectChannels() {
+		int[] result = new int[] { 0 };
+
+		sd.setInstance(streamRecord);
+
+		assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1));
+		assertArrayEquals(result, globalPartitioner.selectChannels(sd, 2));
+		assertArrayEquals(result, globalPartitioner.selectChannels(sd, 1024));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
new file mode 100644
index 0000000..d99a21e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.partitioner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ShufflePartitionerTest {
+
+	private ShufflePartitioner<Tuple> shufflePartitioner;
+	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
+			null);
+
+	@Before
+	public void setPartitioner() {
+		shufflePartitioner = new ShufflePartitioner<Tuple>();
+	}
+
+	@Test
+	public void testSelectChannelsLength() {
+		sd.setInstance(streamRecord);
+		assertEquals(1, shufflePartitioner.selectChannels(sd, 1).length);
+		assertEquals(1, shufflePartitioner.selectChannels(sd, 2).length);
+		assertEquals(1, shufflePartitioner.selectChannels(sd, 1024).length);
+	}
+
+	@Test
+	public void testSelectChannelsInterval() {
+		sd.setInstance(streamRecord);
+		assertEquals(0, shufflePartitioner.selectChannels(sd, 1)[0]);
+
+		assertTrue(0 <= shufflePartitioner.selectChannels(sd, 2)[0]);
+		assertTrue(2 > shufflePartitioner.selectChannels(sd, 2)[0]);
+
+		assertTrue(0 <= shufflePartitioner.selectChannels(sd, 1024)[0]);
+		assertTrue(1024 > shufflePartitioner.selectChannels(sd, 1024)[0]);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
deleted file mode 100644
index 5b98e8c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/state/OperatorStateTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.runtime.state.StateCheckpoint;
-import org.junit.Test;
-
-public class OperatorStateTest {
-
-	@Test
-	public void testOperatorState() {
-		OperatorState<Integer> os = new OperatorState<Integer>(5);
-
-		StateCheckpoint<Integer> scp = os.checkpoint();
-
-		assertTrue(os.stateEquals(scp.restore()));
-
-		assertEquals((Integer) 5, os.getState());
-
-		os.update(10);
-
-		assertEquals((Integer) 10, os.getState());
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
index 98a6a8d..697c796 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
@@ -27,13 +27,13 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.streaming.io.IndexedReaderIterator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.co.CoStreamOperator;
+import org.apache.flink.streaming.runtime.io.CoReaderIterator;
+import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -153,17 +153,17 @@ public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> {
 		return mockIterator;
 	}
 
-	public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable,
+	public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoStreamOperator<IN1, IN2, OUT> operator,
 			List<IN1> input1, List<IN2> input2) {
 		MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2);
-		invokable.setup(mockContext);
+		operator.setup(mockContext);
 
 		try {
-			invokable.open(null);
-			invokable.invoke();
-			invokable.close();
+			operator.open(null);
+			operator.run();
+			operator.close();
 		} catch (Exception e) {
-			throw new RuntimeException("Cannot invoke invokable.", e);
+			throw new RuntimeException("Cannot invoke operator.", e);
 		}
 
 		return mockContext.getOutputs();

http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index af836e2..709a59a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -26,13 +26,13 @@ import java.util.List;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.StreamConfig;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.streaming.io.IndexedReaderIterator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.io.CoReaderIterator;
+import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskContext;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
@@ -104,16 +104,16 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT> {
 		return iterator;
 	}
 
-	public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable,
+	public static <IN, OUT> List<OUT> createAndExecute(StreamOperator<IN, OUT> operator,
 			List<IN> inputs) {
 		MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
-		invokable.setup(mockContext);
+		operator.setup(mockContext);
 		try {
-			invokable.open(null);
-			invokable.invoke();
-			invokable.close();
+			operator.open(null);
+			operator.run();
+			operator.close();
 		} catch (Exception e) {
-			throw new RuntimeException("Cannot invoke invokable.", e);
+			throw new RuntimeException("Cannot invoke operator.", e);
 		}
 
 		return mockContext.getOutputs();