You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/07/21 12:45:12 UTC

[2/8] flink git commit: [FLINK-1967] Introduce (Event)time in Streaming

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
index 405a28e..856f7aa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
 public class ForwardPartitionerTest {
 
 	private RebalancePartitioner<Tuple> forwardPartitioner;
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
 	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
 			null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
index 69c00cd..6ae3730 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertArrayEquals;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Before;
 import org.junit.Test;
@@ -29,7 +28,7 @@ import org.junit.Test;
 public class GlobalPartitionerTest {
 
 	private GlobalPartitioner<Tuple> globalPartitioner;
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
 	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
 			null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
index d99a21e..aff177c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
 
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Before;
 import org.junit.Test;
@@ -30,7 +29,7 @@ import org.junit.Test;
 public class ShufflePartitionerTest {
 
 	private ShufflePartitioner<Tuple> shufflePartitioner;
-	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+	private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
 	private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
 			null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
new file mode 100644
index 0000000..d623dd8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Tests for {@link OneInputStreamTask}.
+ *
+ * <p>
+ * Note:<br>
+ * We only use a {@link StreamMap} operator here. We also test the individual operators but Map is
+ * used as a representative to test OneInputStreamTask, since OneInputStreamTask is used for all
+ * OneInputStreamOperators.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+public class OneInputStreamTaskTest {
+
+	/**
+	 * This test verifies that open() and close() are correctly called. This test also verifies
+	 * that timestamps of emitted elements are correct. {@link StreamMap} assigns the input
+	 * timestamp to emitted elements.
+	 */
+	@Test
+	public void testOpenCloseAndTimestamps() throws Exception {
+		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new TestOpenCloseMapFunction());
+		streamConfig.setStreamOperator(mapOperator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+
+		testHarness.invoke();
+
+		testHarness.processElement(new StreamRecord<String>("Hello", initialTime + 1));
+		testHarness.processElement(new StreamRecord<String>("Ciao", initialTime + 2));
+		expectedOutput.add(new StreamRecord<String>("Hello", initialTime + 1));
+		expectedOutput.add(new StreamRecord<String>("Ciao", initialTime + 2));
+
+		testHarness.endInput();
+
+		testHarness.waitForTaskCompletion();
+
+		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.",
+				expectedOutput,
+				testHarness.getOutput());
+	}
+
+	/**
+	 * This test verifies that watermarks are correctly forwarded. This also checks whether
+	 * watermarks are forwarded only when we have received watermarks from all inputs. The
+	 * forwarded watermark must be the minimum of the watermarks of all inputs.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testWatermarkForwarding() throws Exception {
+		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
+		streamConfig.setStreamOperator(mapOperator);
+
+		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		long initialTime = 0L;
+
+		testHarness.invoke();
+
+		testHarness.processElement(new Watermark(initialTime), 0, 0);
+		testHarness.processElement(new Watermark(initialTime), 0, 1);
+		testHarness.processElement(new Watermark(initialTime), 1, 0);
+
+		// now the output should still be empty
+		testHarness.waitForInputProcessing();
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.processElement(new Watermark(initialTime), 1, 1);
+
+		// now the watermark should have propagated, Map simply forward Watermarks
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(new Watermark(initialTime));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.",
+				expectedOutput,
+				testHarness.getOutput());
+
+		// contrary to checkpoint barriers these elements are not blocked by watermarks
+		testHarness.processElement(new StreamRecord<String>("Hello", initialTime));
+		testHarness.processElement(new StreamRecord<String>("Ciao", initialTime));
+		expectedOutput.add(new StreamRecord<String>("Hello", initialTime));
+		expectedOutput.add(new StreamRecord<String>("Ciao", initialTime));
+
+		testHarness.processElement(new Watermark(initialTime + 4), 0, 0);
+		testHarness.processElement(new Watermark(initialTime + 3), 0, 1);
+		testHarness.processElement(new Watermark(initialTime + 3), 1, 0);
+		testHarness.processElement(new Watermark(initialTime + 2), 1, 1);
+
+		// check whether we get the minimum of all the watermarks, this must also only occur in
+		// the output after the two StreamRecords
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(new Watermark(initialTime + 2));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+
+		// advance watermark from one of the inputs, now we should get a now one since the
+		// minimum increases
+		testHarness.processElement(new Watermark(initialTime + 4), 1, 1);
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(new Watermark(initialTime + 3));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// advance the other two inputs, now we should get a new one since the
+		// minimum increases again
+		testHarness.processElement(new Watermark(initialTime + 4), 0, 1);
+		testHarness.processElement(new Watermark(initialTime + 4), 1, 0);
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(new Watermark(initialTime + 4));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.endInput();
+
+		testHarness.waitForTaskCompletion();
+
+		List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+		Assert.assertEquals(2, resultElements.size());
+	}
+
+	/**
+	 * This test verifies that checkpoint barriers are correctly forwarded.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testCheckpointBarriers() throws Exception {
+		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
+		streamConfig.setStreamOperator(mapOperator);
+
+		Queue expectedOutput = new ConcurrentLinkedQueue();
+		long initialTime = 0L;
+
+		testHarness.invoke();
+
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+
+		// These elements should be buffered until we receive barriers from
+		// all inputs
+		testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
+		testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
+
+		// These elements should be forwarded, since we did not yet receive a checkpoint barrier
+		// on that input, only add to same input, otherwise we would not know the ordering
+		// of the output since the Task might read the inputs in any order
+		testHarness.processElement(new StreamRecord<String>("Hello-1-1", initialTime), 1, 1);
+		testHarness.processElement(new StreamRecord<String>("Ciao-1-1", initialTime), 1, 1);
+		expectedOutput.add(new StreamRecord<String>("Hello-1-1", initialTime));
+		expectedOutput.add(new StreamRecord<String>("Ciao-1-1", initialTime));
+
+		testHarness.waitForInputProcessing();
+		// we should not yet see the barrier, only the two elements from non-blocked input
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+
+		testHarness.waitForInputProcessing();
+
+		// now we should see the barrier and after that the buffered elements
+		expectedOutput.add(new CheckpointBarrier(0, 0));
+		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
+		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
+
+		testHarness.endInput();
+
+		testHarness.waitForTaskCompletion();
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+	}
+
+	/**
+	 * This test verifies that checkpoint barriers and barrier buffers work correctly with
+	 * concurrent checkpoint barriers where one checkpoint is "overtaking" another checkpoint, i.e.
+	 * some inputs receive barriers from an earlier checkpoint, thereby blocking,
+	 * then all inputs receive barriers from a later checkpoint.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testOvertakingCheckpointBarriers() throws Exception {
+		final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
+		final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
+		streamConfig.setStreamOperator(mapOperator);
+
+		Queue expectedOutput = new ConcurrentLinkedQueue();
+		long initialTime = 0L;
+
+		testHarness.invoke();
+
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+
+		// These elements should be buffered until we receive barriers from
+		// all inputs
+		testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
+		testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
+
+		// These elements should be forwarded, since we did not yet receive a checkpoint barrier
+		// on that input, only add to same input, otherwise we would not know the ordering
+		// of the output since the Task might read the inputs in any order
+		testHarness.processElement(new StreamRecord<String>("Hello-1-1", initialTime), 1, 1);
+		testHarness.processElement(new StreamRecord<String>("Ciao-1-1", initialTime), 1, 1);
+		expectedOutput.add(new StreamRecord<String>("Hello-1-1", initialTime));
+		expectedOutput.add(new StreamRecord<String>("Ciao-1-1", initialTime));
+
+		testHarness.waitForInputProcessing();
+		// we should not yet see the barrier, only the two elements from non-blocked input
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// Now give a later barrier to all inputs, this should unblock the first channel,
+		// thereby allowing the two blocked elements through
+		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+
+		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
+		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
+		expectedOutput.add(new CheckpointBarrier(1, 1));
+
+		testHarness.waitForInputProcessing();
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+
+		// Then give the earlier barrier, these should be ignored
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+
+		testHarness.waitForInputProcessing();
+
+
+		testHarness.endInput();
+
+		testHarness.waitForTaskCompletion();
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+	}
+
+	// This must only be used in one test, otherwise the static fields will be changed
+	// by several tests concurrently
+	private static class TestOpenCloseMapFunction extends RichMapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		public static boolean openCalled = false;
+		public static boolean closeCalled = false;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			if (closeCalled) {
+				Assert.fail("Close called before open.");
+			}
+			openCalled = true;
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			if (!openCalled) {
+				Assert.fail("Open was not called before close.");
+			}
+			closeCalled = true;
+		}
+
+		@Override
+		public String map(String value) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called before run.");
+			}
+			return value;
+		}
+	}
+
+	private static class IdentityMap implements MapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map(String value) throws Exception {
+			return value;
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
new file mode 100644
index 0000000..a8029e6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+
+import java.io.IOException;
+
+
+/**
+ * Test harness for testing a {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
+ *
+ * <p>
+ * This mock Invokable provides the task with a basic runtime context and allows pushing elements
+ * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
+ * and events. You are free to modify the retrieved list.
+ *
+ * <p>
+ * After setting up everything the Task can be invoked using {@link #invoke()}. This will start
+ * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
+ * thread to finish. Use {@link #processElement} to send elements to the task. Use
+ * {@link #processEvent(AbstractEvent)} to send events to the task.
+ * Before waiting for the task to finish you must call {@link #endInput()} to signal to the task
+ * that data entry is finished.
+ *
+ * <p>
+ * When Elements or Events are offered to the Task they are put into a queue. The input gates
+ * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all
+ * queues are empty. This must be used after entering some elements before checking the
+ * desired output.
+ *
+ * <p>
+ * When using this you need to add the following line to your test class to setup Powermock:
+ * {@code @PrepareForTest({ResultPartitionWriter.class})}
+ */
+public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarness<OUT> {
+
+	private TypeInformation<IN> inputType;
+	private TypeSerializer<IN> inputSerializer;
+
+	/**
+	 * Creates a test harness with the specified number of input gates and specified number
+	 * of channels per input gate.
+	 */
+	public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
+			int numInputGates,
+
+			int numInputChannelsPerGate,
+			TypeInformation<IN> inputType,
+			TypeInformation<OUT> outputType) {
+		super(task, outputType);
+
+		this.inputType = inputType;
+		inputSerializer = inputType.createSerializer(executionConfig);
+
+		this.numInputGates = numInputGates;
+		this.numInputChannelsPerGate = numInputChannelsPerGate;
+	}
+
+	/**
+	 * Creates a test harness with one input gate that has one input channel.
+	 */
+	public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
+			TypeInformation<IN> inputType,
+			TypeInformation<OUT> outputType) {
+		this(task, 1, 1, inputType, outputType);
+	}
+
+	@Override
+	protected void initializeInputs() throws IOException, InterruptedException {
+		inputGates = new StreamTestSingleInputGate[numInputGates];
+
+		for (int i = 0; i < numInputGates; i++) {
+			inputGates[i] = new StreamTestSingleInputGate<IN>(
+					numInputChannelsPerGate,
+					bufferSize,
+					inputSerializer);
+			this.mockEnv.addInputGate(inputGates[i].getInputGate());
+		}
+
+
+		streamConfig.setNumberOfInputs(1);
+		streamConfig.setTypeSerializerIn1(inputSerializer);
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index b4877c6..f34eafe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -18,50 +18,63 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.Serializable;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * These tests verify that the RichFunction methods are called (in correct order). And that
+ * checkpointing/element emission don't occur concurrently.
+ */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({Task.class, ResultPartitionWriter.class})
-public class SourceStreamTaskTest extends StreamTaskTestBase {
+@PrepareForTest({ResultPartitionWriter.class})
+public class SourceStreamTaskTest {
 
-	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;
 
-	private static final int NETWORK_BUFFER_SIZE = 1024;
+	/**
+	 * This test verifies that open() and close() are correctly called by the StreamTask.
+	 */
+	@Test
+	public void testOpenClose() throws Exception {
+		final SourceStreamTask<String> sourceTask = new SourceStreamTask<String>();
+		final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<String>(sourceTask, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		StreamSource<String> sourceOperator = new StreamSource<String>(new OpenCloseTestSource());
+		streamConfig.setStreamOperator(sourceOperator);
+
+		testHarness.invoke();
+		testHarness.waitForTaskCompletion();
+
+		Assert.assertTrue("RichFunction methods where not called.", OpenCloseTestSource.closeCalled);
+
+		List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+		Assert.assertEquals(10, resultElements.size());
+	}
 
 	/**
 	 * This test ensures that the SourceStreamTask properly serializes checkpointing
@@ -76,7 +89,7 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 	 * source kept emitting elements while the checkpoint was ongoing.
 	 */
 	@Test
-	public void testDataSourceTask() throws Exception {
+	public void testCheckpointing() throws Exception {
 		final int NUM_ELEMENTS = 100;
 		final int NUM_CHECKPOINTS = 100;
 		final int NUM_CHECKPOINTERS = 1;
@@ -84,38 +97,15 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 		final int SOURCE_CHECKPOINT_DELAY = 1000; // how many random values we sum up in storeCheckpoint
 		final int SOURCE_READ_DELAY = 1; // in ms
 
-		List<Tuple2<Long, Integer>> outList = new ArrayList<Tuple2<Long, Integer>>();
-
-		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
-
-		StreamSource<Tuple2<Long, Integer>> sourceOperator = new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));
 
+		final TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
 		final SourceStreamTask<Tuple2<Long, Integer>> sourceTask = new SourceStreamTask<Tuple2<Long, Integer>>();
+		final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo);
 
-		TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
-		TypeSerializer<Tuple2<Long, Integer>> serializer = typeInfo.createSerializer(new ExecutionConfig());
-		StreamRecordSerializer<Tuple2<Long, Integer>> streamSerializer = new StreamRecordSerializer<Tuple2<Long, Integer>>(typeInfo, new ExecutionConfig());
-
-		super.addOutput(outList, serializer);
-
-		StreamConfig streamConfig = super.getStreamConfig();
-
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		StreamSource<Tuple2<Long, Integer>> sourceOperator = new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));
 		streamConfig.setStreamOperator(sourceOperator);
-		streamConfig.setChainStart();
-		streamConfig.setOutputSelectorWrapper(new BroadcastOutputSelectorWrapper<Object>());
-		streamConfig.setNumberOfOutputs(1);
-
-		List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
-		StreamNode sourceVertex = new StreamNode(null, 0, sourceOperator, "source", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
-		StreamNode targetVertexDummy = new StreamNode(null, 1, sourceOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
 
-		outEdgesInOrder.add(new StreamEdge(sourceVertex, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>()));
-		streamConfig.setOutEdgesInOrder(outEdgesInOrder);
-		streamConfig.setNonChainedOutputs(outEdgesInOrder);
-		streamConfig.setTypeSerializerOut1(streamSerializer);
-		streamConfig.setVertexID(0);
-
-		super.registerTask(sourceTask);
 
 		ExecutorService executor = Executors.newFixedThreadPool(10);
 		Future[] checkpointerResults = new Future[NUM_CHECKPOINTERS];
@@ -123,13 +113,8 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 			checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask));
 		}
 
-
-		try {
-			sourceTask.invoke();
-		} catch (Exception e) {
-			System.err.println(StringUtils.stringifyException(e));
-			Assert.fail("Invoke method caused exception.");
-		}
+		testHarness.invoke();
+		testHarness.waitForTaskCompletion();
 
 		// Get the result from the checkpointers, if these threw an exception it
 		// will be rethrown here
@@ -142,11 +127,11 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 			}
 		}
 
-		Assert.assertEquals(NUM_ELEMENTS, outList.size());
+		List<Tuple2<Long, Integer>> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+		Assert.assertEquals(NUM_ELEMENTS, resultElements.size());
 	}
 
-	private static class MockSource extends RichSourceFunction<Tuple2<Long, Integer>> implements StateCheckpointer<Integer, Integer> {
-
+	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed {
 		private static final long serialVersionUID = 1;
 
 		private int maxElements;
@@ -157,7 +142,6 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 		private volatile long lastCheckpointId = -1;
 
 		private Semaphore semaphore;
-		private OperatorState<Integer> state;
 
 		private volatile boolean isRunning = true;
 
@@ -166,7 +150,7 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 			this.checkpointDelay = checkpointDelay;
 			this.readDelay = readDelay;
 			this.count = 0;
-			this.semaphore = new Semaphore(1);
+			semaphore = new Semaphore(1);
 		}
 
 		@Override
@@ -191,33 +175,32 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 		public void cancel() {
 			isRunning = false;
 		}
-		
-		@Override
-		public void open(Configuration conf) throws IOException{
-			state = getRuntimeContext().getOperatorState("state", 1, false, this);
-		}
-
 
 		@Override
-		public Integer snapshotState(Integer state, long checkpointId, long checkpointTimestamp) {
+		public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
 			if (!semaphore.tryAcquire()) {
 				Assert.fail("Concurrent invocation of snapshotState.");
-			} else {
-				int startCount = count;
-				
-				if (startCount != count) {
-					semaphore.release();
-					// This means that next() was invoked while the snapshot was ongoing
-					Assert.fail("Count is different at start end end of snapshot.");
-				}
+			}
+			int startCount = count;
+			lastCheckpointId = checkpointId;
+
+			long sum = 0;
+			for (int i = 0; i < checkpointDelay; i++) {
+				sum += new Random().nextLong();
+			}
+
+			if (startCount != count) {
 				semaphore.release();
+				// This means that next() was invoked while the snapshot was ongoing
+				Assert.fail("Count is different at start end end of snapshot.");
 			}
-			return 0;
+			semaphore.release();
+			return sum;
 		}
 
 		@Override
-		public Integer restoreState(Integer stateSnapshot) {
-			return stateSnapshot;
+		public void restoreState(Serializable state) {
+
 		}
 	}
 
@@ -247,5 +230,45 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 			return true;
 		}
 	}
+
+	public static class OpenCloseTestSource extends RichSourceFunction<String> {
+		private static final long serialVersionUID = 1L;
+
+		public static boolean openCalled = false;
+		public static boolean closeCalled = false;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			if (closeCalled) {
+				Assert.fail("Close called before open.");
+			}
+			openCalled = true;
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			if (!openCalled) {
+				Assert.fail("Open was not called before close.");
+			}
+			closeCalled = true;
+		}
+
+		@Override
+		public void run(SourceContext<String> ctx) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called before run.");
+			}
+			for (int i = 0; i < 10; i++) {
+				ctx.collect("Hello" + i);
+			}
+		}
+
+		@Override
+		public void cancel() {
+
+		}
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 2092d83..df0c9ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -36,7 +37,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
@@ -45,8 +45,6 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.MutableObjectIterator;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -54,6 +52,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.Future;
 
 import static org.junit.Assert.fail;
@@ -87,9 +86,9 @@ public class StreamMockEnvironment implements Environment {
 
 	private final int bufferSize;
 
-	public StreamMockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
-		this.jobConfiguration = new Configuration();
-		this.taskConfiguration = new Configuration();
+	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
+		this.jobConfiguration = jobConfig;
+		this.taskConfiguration = taskConfig;
 		this.inputs = new LinkedList<InputGate>();
 		this.outputs = new LinkedList<ResultPartitionWriter>();
 
@@ -101,20 +100,11 @@ public class StreamMockEnvironment implements Environment {
 		this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId());
 	}
 
-	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
-		try {
-			final IteratorWrappingTestSingleInputGate<Record> reader = new IteratorWrappingTestSingleInputGate<Record>(bufferSize, Record.class, inputIterator);
-
-			inputs.add(reader.getInputGate());
-
-			return reader;
-		}
-		catch (Throwable t) {
-			throw new RuntimeException("Error setting up mock readers: " + t.getMessage(), t);
-		}
+	public void addInputGate(InputGate gate) {
+		inputs.add(gate);
 	}
 
-	public <T> void addOutput(final List<T> outputList, final TypeSerializer<T> serializer) {
+	public <T> void addOutput(final Queue<Object> outputList, final TypeSerializer<T> serializer) {
 		try {
 			// The record-oriented writers wrap the buffer writer. We mock it
 			// to collect the returned buffers and deserialize the content to
@@ -161,6 +151,29 @@ public class StreamMockEnvironment implements Environment {
 				}
 			}).when(mockWriter).writeBuffer(any(Buffer.class), anyInt());
 
+			// Add events to the output list
+			doAnswer(new Answer<Void>() {
+
+				@Override
+				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+					AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0];
+
+					outputList.add(event);
+					return null;
+				}
+			}).when(mockWriter).writeEvent(any(AbstractEvent.class), anyInt());
+
+			doAnswer(new Answer<Void>() {
+
+				@Override
+				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+					AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0];
+
+					outputList.add(event);
+					return null;
+				}
+			}).when(mockWriter).writeEventToAllChannels(any(AbstractEvent.class));
+
 			outputs.add(mockWriter);
 		}
 		catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestBase.java
deleted file mode 100644
index f1a36c8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestBase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-
-import java.util.List;
-
-
-public abstract class StreamTaskTestBase {
-
-	protected long memorySize = 0;
-
-	protected StreamMockEnvironment mockEnv;
-
-	public void initEnvironment(long memorySize, int bufferSize) {
-		this.memorySize = memorySize;
-		this.mockEnv = new StreamMockEnvironment(this.memorySize, new MockInputSplitProvider(), bufferSize);
-	}
-
-	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId) {
-		final IteratorWrappingTestSingleInputGate<Record> reader = addInput(input, groupId, true);
-
-		return reader;
-	}
-
-	public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId, boolean read) {
-		final IteratorWrappingTestSingleInputGate<Record> reader = this.mockEnv.addInput(input);
-		TaskConfig conf = new TaskConfig(this.mockEnv.getTaskConfiguration());
-		conf.addInputToGroup(groupId);
-		conf.setInputSerializer(RecordSerializerFactory.get(), groupId);
-
-		if (read) {
-			reader.read();
-		}
-
-		return reader;
-	}
-
-	public <T> void addOutput(List<T> output, TypeSerializer<T> serializer) {
-		this.mockEnv.addOutput(output, serializer);
-		TaskConfig conf = new TaskConfig(this.mockEnv.getTaskConfiguration());
-		conf.addOutputShipStrategy(ShipStrategyType.FORWARD);
-		conf.setOutputSerializer(RecordSerializerFactory.get());
-	}
-
-	public Configuration getConfiguration() {
-		return this.mockEnv.getTaskConfiguration();
-	}
-
-	public StreamConfig getStreamConfig() {
-		return new StreamConfig(this.mockEnv.getTaskConfiguration());
-	}
-
-	public void registerTask(AbstractInvokable task) {
-		task.setEnvironment(this.mockEnv);
-		task.registerInputOutput();
-	}
-
-	public MemoryManager getMemoryManager() {
-		return this.mockEnv.getMemoryManager();
-	}
-
-	@After
-	public void shutdownIOManager() throws Exception {
-		this.mockEnv.getIOManager().shutdown();
-		Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown());
-	}
-
-	@After
-	public void shutdownMemoryManager() throws Exception {
-		if (this.memorySize > 0) {
-			MemoryManager memMan = getMemoryManager();
-			if (memMan != null) {
-				Assert.assertTrue("Memory Manager managed memory was not completely freed.", memMan.verifyEmpty());
-				memMan.shutdown();
-			}
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
new file mode 100644
index 0000000..a4cc0d3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+
+/**
+ * Test harness for testing a {@link StreamTask}.
+ *
+ * <p>
+ * This mock Invokable provides the task with a basic runtime context and allows pushing elements
+ * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
+ * and events. You are free to modify the retrieved list.
+ *
+ * <p>
+ * After setting up everything the Task can be invoked using {@link #invoke()}. This will start
+ * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
+ * thread to finish.
+ *
+ * <p>
+ * When using this you need to add the following line to your test class to setup Powermock:
+ * {@code @PrepareForTest({ResultPartitionWriter.class})}
+ */
+public class StreamTaskTestHarness<OUT> {
+
+	private static final int DEFAULT_MEMORY_MANAGER_SIZE = 1024 * 1024;
+
+	private static final int DEFAULT_NETWORK_BUFFER_SIZE = 1024;
+
+	protected long memorySize = 0;
+	protected int bufferSize = 0;
+
+	protected StreamMockEnvironment mockEnv;
+	protected ExecutionConfig executionConfig;
+	private Configuration jobConfig;
+	private Configuration taskConfig;
+	protected StreamConfig streamConfig;
+
+	private AbstractInvokable task;
+
+	private TypeInformation<OUT> outputType;
+	private TypeSerializer<OUT> outputSerializer;
+	private StreamRecordSerializer<OUT> outputStreamRecordSerializer;
+
+	private ConcurrentLinkedQueue outputList;
+
+	protected Thread taskThread;
+
+	// These don't get initialized, the one-input/two-input specific test harnesses
+	// must initialize these if they want to simulate input. We have them here so that all the
+	// input related methods only need to be implemented once, in generic form
+	protected int numInputGates;
+	protected int numInputChannelsPerGate;
+	protected StreamTestSingleInputGate[] inputGates;
+
+	public StreamTaskTestHarness(AbstractInvokable task, TypeInformation<OUT> outputType) {
+		this.task = task;
+		this.memorySize = DEFAULT_MEMORY_MANAGER_SIZE;
+		this.bufferSize = DEFAULT_NETWORK_BUFFER_SIZE;
+
+		this.jobConfig = new Configuration();
+		this.taskConfig = new Configuration();
+		this.executionConfig = new ExecutionConfig();
+		executionConfig.enableTimestamps();
+		try {
+			InstantiationUtil.writeObjectToConfig(executionConfig, this.jobConfig, ExecutionConfig.CONFIG_KEY);
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+
+		streamConfig = new StreamConfig(taskConfig);
+		streamConfig.setChainStart();
+		streamConfig.setBufferTimeout(0);
+
+		this.outputType = outputType;
+		outputSerializer = outputType.createSerializer(executionConfig);
+		outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outputSerializer);
+	}
+
+	/**
+	 * This must be overwritten for OneInputStreamTask or TwoInputStreamTask test harnesses.
+	 */
+	protected void initializeInputs() throws IOException, InterruptedException {
+	}
+
+	@SuppressWarnings("unchecked")
+	private void initializeOutput() {
+		outputList = new ConcurrentLinkedQueue();
+
+		mockEnv.addOutput(outputList, outputStreamRecordSerializer);
+
+		streamConfig.setOutputSelectorWrapper(new BroadcastOutputSelectorWrapper<Object>());
+		streamConfig.setNumberOfOutputs(1);
+
+		StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
+			private static final long serialVersionUID = 1L;
+		};
+
+		List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
+		StreamNode sourceVertexDummy = new StreamNode(null, 0, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+		StreamNode targetVertexDummy = new StreamNode(null, 1, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+
+		outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>()));
+		streamConfig.setOutEdgesInOrder(outEdgesInOrder);
+		streamConfig.setNonChainedOutputs(outEdgesInOrder);
+		streamConfig.setTypeSerializerOut1(outputSerializer);
+		streamConfig.setVertexID(0);
+
+	}
+
+	/**
+	 * Invoke the Task. This resets the output of any previous invocation. This will start a new
+	 * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} to wait for the
+	 * Task thread to finish running.
+	 */
+	public void invoke() throws Exception {
+		mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, memorySize, new MockInputSplitProvider(), bufferSize);
+		task.setEnvironment(mockEnv);
+
+		initializeInputs();
+		initializeOutput();
+
+		task.registerInputOutput();
+
+		taskThread = new Thread(new Runnable() {
+			@Override
+			public void run() {
+
+
+
+				try {
+					task.invoke();
+					shutdownIOManager();
+					shutdownMemoryManager();
+				} catch (Exception e) {
+					throw new RuntimeException(e);
+				}
+
+			}
+		});
+
+		taskThread.start();
+	}
+
+	public void waitForTaskCompletion() throws InterruptedException {
+		if (taskThread == null) {
+			throw new IllegalStateException("Task thread was not started.");
+		}
+
+		taskThread.join();
+	}
+
+	/**
+	 * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
+	 * {@link org.apache.flink.streaming.util.TestHarnessUtil#getRawElementsFromOutput(java.util.Queue)}}
+	 * to extract only the StreamRecords.
+	 */
+	public Queue getOutput() {
+		return outputList;
+	}
+
+	public StreamConfig getStreamConfig() {
+		return streamConfig;
+	}
+
+	private void shutdownIOManager() throws Exception {
+		this.mockEnv.getIOManager().shutdown();
+		Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown());
+	}
+
+	private void shutdownMemoryManager() throws Exception {
+		if (this.memorySize > 0) {
+			MemoryManager memMan = this.mockEnv.getMemoryManager();
+			if (memMan != null) {
+				Assert.assertTrue("Memory Manager managed memory was not completely freed.", memMan.verifyEmpty());
+				memMan.shutdown();
+			}
+		}
+	}
+
+	/**
+	 * Sends the element to input gate 0 on channel 0.
+	 */
+	@SuppressWarnings("unchecked")
+	public void processElement(Object element) {
+		inputGates[0].sendElement(element, 0);
+	}
+
+	/**
+	 * Sends the element to the specified channel on the specified input gate.
+	 */
+	@SuppressWarnings("unchecked")
+	public void processElement(Object element, int inputGate, int channel) {
+		inputGates[inputGate].sendElement(element, channel);
+	}
+
+	/**
+	 * Sends the event to input gate 0 on channel 0.
+	 */
+	public void processEvent(AbstractEvent event) {
+		inputGates[0].sendEvent(event, 0);
+	}
+
+	/**
+	 * Sends the event to the specified channel on the specified input gate.
+	 */
+	public void processEvent(AbstractEvent event, int inputGate, int channel) {
+		inputGates[inputGate].sendEvent(event, channel);
+	}
+
+	/**
+	 * This only returns after all input queues are empty.
+	 */
+	public void waitForInputProcessing() {
+
+
+		// first wait for all input queues to be empty
+		try {
+			Thread.sleep(1);
+		} catch (InterruptedException e) {
+		}
+		while (true) {
+			boolean allEmpty = true;
+			for (int i = 0; i < numInputGates; i++) {
+				if (!inputGates[i].allQueuesEmpty()) {
+					allEmpty = false;
+				}
+			}
+			try {
+				Thread.sleep(10);
+			} catch (InterruptedException e) {
+			}
+			if (allEmpty) {
+				break;
+			}
+		}
+
+		// then wait for the Task Thread to be in a blocked state
+		// Check whether the state is blocked, this should be the case if it cannot
+		// read more input, i.e. all currently available input has been processed.
+		while (true) {
+			Thread.State state = taskThread.getState();
+			if (state == Thread.State.BLOCKED || state == Thread.State.TERMINATED ||
+					state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING) {
+				break;
+			}
+
+			try {
+				Thread.sleep(1);
+			} catch (InterruptedException e) {
+			}
+		}
+	}
+
+	/**
+	 * Notifies all input channels on all input gates that no more input will arrive. This
+	 * will usually make the Task exit from his internal loop.
+	 */
+	public void endInput() {
+		for (int i = 0; i < numInputGates; i++) {
+			inputGates[i].endInput();
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
new file mode 100644
index 0000000..3b113ab
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Tests for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. Theses tests
+ * implicitly also test the {@link org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor}.
+ *
+ * <p>
+ * Note:<br>
+ * We only use a {@link CoStreamMap} operator here. We also test the individual operators but Map is
+ * used as a representative to test TwoInputStreamTask, since TwoInputStreamTask is used for all
+ * TwoInputStreamOperators.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+public class TwoInputStreamTaskTest {
+
+	/**
+	 * This test verifies that open() and close() are correctly called. This test also verifies
+	 * that timestamps of emitted elements are correct. {@link CoStreamMap} assigns the input
+	 * timestamp to emitted elements.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testOpenCloseAndTimestamps() throws Exception {
+		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
+		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new TestOpenCloseMapFunction());
+		streamConfig.setStreamOperator(coMapOperator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+
+		testHarness.invoke();
+
+		testHarness.processElement(new StreamRecord<String>("Hello", initialTime + 1), 0, 0);
+		expectedOutput.add(new StreamRecord<String>("Hello", initialTime + 1));
+
+		// wait until the input is processed to ensure ordering of the output
+		testHarness.waitForInputProcessing();
+
+		testHarness.processElement(new StreamRecord<Integer>(1337, initialTime + 2), 1, 0);
+
+		expectedOutput.add(new StreamRecord<String>("1337", initialTime + 2));
+
+		testHarness.endInput();
+
+		testHarness.waitForTaskCompletion();
+
+		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+	}
+
+	/**
+	 * This test verifies that watermarks are correctly forwarded. This also checks whether
+	 * watermarks are forwarded only when we have received watermarks from all inputs. The
+	 * forwarded watermark must be the minimum of the watermarks of all inputs.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testWatermarkForwarding() throws Exception {
+		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
+		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
+		streamConfig.setStreamOperator(coMapOperator);
+
+		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+		long initialTime = 0L;
+
+		testHarness.invoke();
+
+		testHarness.processElement(new Watermark(initialTime), 0, 0);
+		testHarness.processElement(new Watermark(initialTime), 0, 1);
+
+		testHarness.processElement(new Watermark(initialTime), 1, 0);
+
+
+		// now the output should still be empty
+		testHarness.waitForInputProcessing();
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.processElement(new Watermark(initialTime), 1, 1);
+
+		// now the watermark should have propagated, Map simply forward Watermarks
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(new Watermark(initialTime));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// contrary to checkpoint barriers these elements are not blocked by watermarks
+		testHarness.processElement(new StreamRecord<String>("Hello", initialTime), 0, 0);
+		testHarness.processElement(new StreamRecord<Integer>(42, initialTime), 1, 1);
+		expectedOutput.add(new StreamRecord<String>("Hello", initialTime));
+		expectedOutput.add(new StreamRecord<String>("42", initialTime));
+
+		testHarness.waitForInputProcessing();
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.processElement(new Watermark(initialTime + 4), 0, 0);
+		testHarness.processElement(new Watermark(initialTime + 3), 0, 1);
+		testHarness.processElement(new Watermark(initialTime + 3), 1, 0);
+		testHarness.processElement(new Watermark(initialTime + 2), 1, 1);
+
+		// check whether we get the minimum of all the watermarks, this must also only occur in
+		// the output after the two StreamRecords
+		expectedOutput.add(new Watermark(initialTime + 2));
+		testHarness.waitForInputProcessing();
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+
+		// advance watermark from one of the inputs, now we should get a now one since the
+		// minimum increases
+		testHarness.processElement(new Watermark(initialTime + 4), 1, 1);
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(new Watermark(initialTime + 3));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		// advance the other two inputs, now we should get a new one since the
+		// minimum increases again
+		testHarness.processElement(new Watermark(initialTime + 4), 0, 1);
+		testHarness.processElement(new Watermark(initialTime + 4), 1, 0);
+		testHarness.waitForInputProcessing();
+		expectedOutput.add(new Watermark(initialTime + 4));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.endInput();
+
+		testHarness.waitForTaskCompletion();
+
+		List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+		Assert.assertEquals(2, resultElements.size());
+	}
+
+	/**
+	 * This test verifies that checkpoint barriers are correctly forwarded.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testCheckpointBarriers() throws Exception {
+		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
+		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
+		streamConfig.setStreamOperator(coMapOperator);
+
+		Queue expectedOutput = new ConcurrentLinkedQueue();
+		long initialTime = 0L;
+
+		testHarness.invoke();
+
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+
+		// This element should be buffered since we received a checkpoint barrier on
+		// this input
+		testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
+
+		// This one should go through
+		testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 1);
+		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
+
+		// These elements should be forwarded, since we did not yet receive a checkpoint barrier
+		// on that input, only add to same input, otherwise we would not know the ordering
+		// of the output since the Task might read the inputs in any order
+		testHarness.processElement(new StreamRecord<Integer>(11, initialTime), 1, 1);
+		testHarness.processElement(new StreamRecord<Integer>(111, initialTime), 1, 1);
+		expectedOutput.add(new StreamRecord<String>("11", initialTime));
+		expectedOutput.add(new StreamRecord<String>("111", initialTime));
+
+		testHarness.waitForInputProcessing();
+		// we should not yet see the barrier, only the two elements from non-blocked input
+		TestHarnessUtil.assertOutputEquals("Output was not correct.",
+				testHarness.getOutput(),
+				expectedOutput);
+
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+
+		testHarness.waitForInputProcessing();
+
+		// now we should see the barrier and after that the buffered elements
+		expectedOutput.add(new CheckpointBarrier(0, 0));
+		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
+		TestHarnessUtil.assertOutputEquals("Output was not correct.",
+				testHarness.getOutput(),
+				expectedOutput);
+
+		testHarness.endInput();
+
+		testHarness.waitForTaskCompletion();
+
+		List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+		Assert.assertEquals(4, resultElements.size());
+	}
+
+	/**
+	 * This test verifies that checkpoint barriers and barrier buffers work correctly with
+	 * concurrent checkpoint barriers where one checkpoint is "overtaking" another checkpoint, i.e.
+	 * some inputs receive barriers from an earlier checkpoint, thereby blocking,
+	 * then all inputs receive barriers from a later checkpoint.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testOvertakingCheckpointBarriers() throws Exception {
+		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
+		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+		StreamConfig streamConfig = testHarness.getStreamConfig();
+		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
+		streamConfig.setStreamOperator(coMapOperator);
+
+		Queue expectedOutput = new ConcurrentLinkedQueue();
+		long initialTime = 0L;
+
+		testHarness.invoke();
+
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+
+		// These elements should be buffered until we receive barriers from
+		// all inputs
+		testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
+		testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
+
+		// These elements should be forwarded, since we did not yet receive a checkpoint barrier
+		// on that input, only add to same input, otherwise we would not know the ordering
+		// of the output since the Task might read the inputs in any order
+		testHarness.processElement(new StreamRecord<Integer>(42, initialTime), 1, 1);
+		testHarness.processElement(new StreamRecord<Integer>(1337, initialTime), 1, 1);
+		expectedOutput.add(new StreamRecord<String>("42", initialTime));
+		expectedOutput.add(new StreamRecord<String>("1337", initialTime));
+
+		testHarness.waitForInputProcessing();
+		// we should not yet see the barrier, only the two elements from non-blocked input
+		TestHarnessUtil.assertOutputEquals("Output was not correct.",
+				expectedOutput,
+				testHarness.getOutput());
+
+		// Now give a later barrier to all inputs, this should unblock the first channel,
+		// thereby allowing the two blocked elements through
+		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+
+		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
+		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
+		expectedOutput.add(new CheckpointBarrier(1, 1));
+
+		testHarness.waitForInputProcessing();
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.",
+				expectedOutput,
+				testHarness.getOutput());
+
+
+		// Then give the earlier barrier, these should be ignored
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
+		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+
+		testHarness.waitForInputProcessing();
+
+
+		testHarness.endInput();
+
+		testHarness.waitForTaskCompletion();
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.",
+				expectedOutput,
+				testHarness.getOutput());
+	}
+
+	// This must only be used in one test, otherwise the static fields will be changed
+	// by several tests concurrently
+	private static class TestOpenCloseMapFunction extends RichCoMapFunction<String, Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		public static boolean openCalled = false;
+		public static boolean closeCalled = false;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			if (closeCalled) {
+				Assert.fail("Close called before open.");
+			}
+			openCalled = true;
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			if (!openCalled) {
+				Assert.fail("Open was not called before close.");
+			}
+			closeCalled = true;
+		}
+
+		@Override
+		public String map1(String value) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called before run.");
+			}
+			return value;
+		}
+
+		@Override
+		public String map2(Integer value) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called before run.");
+			}
+			return value.toString();
+		}
+	}
+
+	private static class IdentityMap implements CoMapFunction<String, Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map1(String value) throws Exception {
+			return value;
+		}
+
+		@Override
+		public String map2(Integer value) throws Exception {
+
+			return value.toString();
+		}
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
new file mode 100644
index 0000000..f37eb66
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+
+/**
+ * Test harness for testing a {@link TwoInputStreamTask}.
+ *
+ * <p>
+ * This mock Invokable provides the task with a basic runtime context and allows pushing elements
+ * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
+ * and events. You are free to modify the retrieved list.
+ *
+ * <p>
+ * After setting up everything the Task can be invoked using {@link #invoke()}. This will start
+ * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
+ * thread to finish. Use {@link #processElement}
+ * to send elements to the task. Use
+ * {@link #processEvent(org.apache.flink.runtime.event.task.AbstractEvent)} to send events to the task.
+ * Before waiting for the task to finish you must call {@link #endInput()} to signal to the task
+ * that data entry is finished.
+ *
+ * <p>
+ * When Elements or Events are offered to the Task they are put into a queue. The input gates
+ * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all
+ * queues are empty. This must be used after entering some elements before checking the
+ * desired output.
+ *
+ * <p>
+ * When using this you need to add the following line to your test class to setup Powermock:
+ * {@code @PrepareForTest({ResultPartitionWriter.class})}
+ */
+public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTestHarness<OUT> {
+
+	private TypeInformation<IN1> inputType1;
+	private TypeSerializer<IN1> inputSerializer1;
+
+	private TypeInformation<IN2> inputType2;
+	private TypeSerializer<IN2> inputSerializer2;
+
+	private int[] inputGateAssignment;
+
+	/**
+	 * Creates a test harness with the specified number of input gates and specified number
+	 * of channels per input gate. Parameter inputGateAssignment specifies for each gate whether
+	 * it should be assigned to the first (1), or second (2) input of the task.
+	 */
+	public TwoInputStreamTaskTestHarness(TwoInputStreamTask<IN1, IN2, OUT> task,
+			int numInputGates,
+			int numInputChannelsPerGate,
+			int[] inputGateAssignment,
+			TypeInformation<IN1> inputType1,
+			TypeInformation<IN2> inputType2,
+			TypeInformation<OUT> outputType) {
+		super(task, outputType);
+
+		this.inputType1 = inputType1;
+		inputSerializer1 = inputType1.createSerializer(executionConfig);
+
+		this.inputType2 = inputType2;
+		inputSerializer2 = inputType2.createSerializer(executionConfig);
+
+		this.numInputGates = numInputGates;
+		this.numInputChannelsPerGate = numInputChannelsPerGate;
+		this.inputGateAssignment = inputGateAssignment;
+	}
+
+	/**
+	 * Creates a test harness with one input gate (that has one input channel) per input. The first
+	 * input gate is assigned to the first task input, the second input gate is assigned to the
+	 * second task input.
+	 */
+	public TwoInputStreamTaskTestHarness(TwoInputStreamTask<IN1, IN2, OUT> task,
+			TypeInformation<IN1> inputType1,
+			TypeInformation<IN2> inputType2,
+			TypeInformation<OUT> outputType) {
+		this(task, 2, 1, new int[] {1, 2}, inputType1, inputType2, outputType);
+	}
+
+	@Override
+	protected void initializeInputs() throws IOException, InterruptedException {
+
+		inputGates = new StreamTestSingleInputGate[numInputGates];
+		List<StreamEdge> inPhysicalEdges = new LinkedList<StreamEdge>();
+
+		StreamOperator<IN1> dummyOperator = new AbstractStreamOperator<IN1>() {
+			private static final long serialVersionUID = 1L;
+		};
+
+		StreamNode sourceVertexDummy = new StreamNode(null, 0, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+		StreamNode targetVertexDummy = new StreamNode(null, 1, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+
+		for (int i = 0; i < numInputGates; i++) {
+
+			switch (inputGateAssignment[i]) {
+				case 1: {
+					inputGates[i] = new StreamTestSingleInputGate<IN1>(
+							numInputChannelsPerGate,
+							bufferSize,
+							inputSerializer1);
+
+
+					StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
+							targetVertexDummy,
+							1,
+							new LinkedList<String>(),
+							new BroadcastPartitioner<Object>());
+
+					inPhysicalEdges.add(streamEdge);
+					break;
+				}
+				case 2: {
+					inputGates[i] = new StreamTestSingleInputGate<IN2>(
+							numInputChannelsPerGate,
+							bufferSize,
+							inputSerializer2);
+
+					StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
+							targetVertexDummy,
+							2,
+							new LinkedList<String>(),
+							new BroadcastPartitioner<Object>());
+
+					inPhysicalEdges.add(streamEdge);
+					break;
+				}
+				default:
+					throw new IllegalStateException("Wrong input gate assignment.");
+			}
+
+			this.mockEnv.addInputGate(inputGates[i].getInputGate());
+		}
+
+		streamConfig.setInPhysicalEdges(inPhysicalEdges);
+		streamConfig.setNumberOfInputs(numInputGates);
+		streamConfig.setTypeSerializerIn1(inputSerializer1);
+		streamConfig.setTypeSerializerIn2(inputSerializer2);
+	}
+
+}
+