You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/07/21 12:45:14 UTC

[4/8] flink git commit: [FLINK-1967] Introduce (Event)time in Streaming

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
new file mode 100644
index 0000000..8499aa2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.functions.RichFoldFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link StreamGroupedFold}. These test that:
+ *
+ * <ul>
+ *     <li>RichFunction methods are called correctly</li>
+ *     <li>Timestamps of processed elements match the input timestamp</li>
+ *     <li>Watermarks are correctly forwarded</li>
+ * </ul>
+ */
+public class StreamGroupedFoldTest {
+
+	private static class MyFolder implements FoldFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String fold(String accumulator, Integer value) throws Exception {
+			return accumulator + value.toString();
+		}
+
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testGroupedFold() throws Exception {
+		TypeInformation<String> outType = TypeExtractor.getForObject("A string");
+
+		StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<Integer, String>(
+				new MyFolder(), new KeySelector<Integer, String>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public String getKey(Integer value) throws Exception {
+				return value.toString();
+			}
+		}, "100", outType);
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
+		testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 2));
+		testHarness.processWatermark(new Watermark(initialTime + 2));
+		testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 3));
+		testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 4));
+		testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 5));
+
+		expectedOutput.add(new StreamRecord<String>("1001", initialTime + 1));
+		expectedOutput.add(new StreamRecord<String>("10011", initialTime + 2));
+		expectedOutput.add(new Watermark(initialTime + 2));
+		expectedOutput.add(new StreamRecord<String>("1002", initialTime + 3));
+		expectedOutput.add(new StreamRecord<String>("10022", initialTime + 4));
+		expectedOutput.add(new StreamRecord<String>("1003", initialTime + 5));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+	}
+
+	@Test
+	public void testOpenClose() throws Exception {
+		StreamGroupedFold<Integer, String> operator = new StreamGroupedFold<Integer, String>(new TestOpenCloseFoldFunction(), new KeySelector<Integer, Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer getKey(Integer value) throws Exception {
+				return value;
+			}
+		}, "init", BasicTypeInfo.STRING_TYPE_INFO);
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
+
+		long initialTime = 0L;
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<Integer>(1, initialTime));
+		testHarness.processElement(new StreamRecord<Integer>(2, initialTime));
+
+		testHarness.close();
+
+		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFoldFunction.closeCalled);
+		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
+	}
+
+	// This must only be used in one test, otherwise the static fields will be changed
+	// by several tests concurrently
+	private static class TestOpenCloseFoldFunction extends RichFoldFunction<Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		public static boolean openCalled = false;
+		public static boolean closeCalled = false;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			if (closeCalled) {
+				Assert.fail("Close called before open.");
+			}
+			openCalled = true;
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			if (!openCalled) {
+				Assert.fail("Open was not called before close.");
+			}
+			closeCalled = true;
+		}
+
+		@Override
+		public String fold(String acc, Integer in) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called before run.");
+			}
+			return acc + in;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
new file mode 100644
index 0000000..dca1cbb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link StreamGroupedReduce}. These test that:
+ *
+ * <ul>
+ *     <li>RichFunction methods are called correctly</li>
+ *     <li>Timestamps of processed elements match the input timestamp</li>
+ *     <li>Watermarks are correctly forwarded</li>
+ * </ul>
+ */
+
+public class StreamGroupedReduceTest {
+
+	private static class MyReducer implements ReduceFunction<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Integer reduce(Integer value1, Integer value2) throws Exception {
+			return value1 + value2;
+		}
+
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testGroupedReduce() throws Exception {
+		StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<Integer>(new MyReducer(), new KeySelector<Integer, Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer getKey(Integer value) throws Exception {
+				return value;
+			}
+		});
+
+		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
+		testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 2));
+		testHarness.processWatermark(new Watermark(initialTime + 2));
+		testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 3));
+		testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 4));
+		testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 5));
+
+		expectedOutput.add(new StreamRecord<Integer>(1, initialTime + 1));
+		expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
+		expectedOutput.add(new Watermark(initialTime + 2));
+		expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 3));
+		expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
+		expectedOutput.add(new StreamRecord<Integer>(3, initialTime + 5));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+	}
+
+	@Test
+	public void testOpenClose() throws Exception {
+		StreamGroupedReduce<Integer> operator = new StreamGroupedReduce<Integer>(new TestOpenCloseReduceFunction(), new KeySelector<Integer, Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer getKey(Integer value) throws Exception {
+				return value;
+			}
+		});
+		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
+
+		long initialTime = 0L;
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<Integer>(1, initialTime));
+		testHarness.processElement(new StreamRecord<Integer>(2, initialTime));
+
+		testHarness.close();
+
+		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseReduceFunction.closeCalled);
+		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
+	}
+
+	// This must only be used in one test, otherwise the static fields will be changed
+	// by several tests concurrently
+	private static class TestOpenCloseReduceFunction extends RichReduceFunction<Integer> {
+		private static final long serialVersionUID = 1L;
+
+		public static boolean openCalled = false;
+		public static boolean closeCalled = false;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			if (closeCalled) {
+				Assert.fail("Close called before open.");
+			}
+			openCalled = true;
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			if (!openCalled) {
+				Assert.fail("Open was not called before close.");
+			}
+			closeCalled = true;
+		}
+
+		@Override
+		public Integer reduce(Integer in1, Integer in2) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called before run.");
+			}
+			return in1 + in2;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
new file mode 100644
index 0000000..d5f2f62
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link StreamMap}. These test that:
+ *
+ * <ul>
+ *     <li>RichFunction methods are called correctly</li>
+ *     <li>Timestamps of processed elements match the input timestamp</li>
+ *     <li>Watermarks are correctly forwarded</li>
+ * </ul>
+ */
+public class StreamMapTest {
+
+	private static class Map implements MapFunction<Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map(Integer value) throws Exception {
+			return "+" + (value + 1);
+		}
+	}
+	
+	@Test
+	public void testMap() throws Exception {
+		StreamMap<Integer, String> operator = new StreamMap<Integer, String>(new Map());
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness = new OneInputStreamOperatorTestHarness<Integer, String>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
+		testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
+		testHarness.processWatermark(new Watermark(initialTime + 2));
+		testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
+
+		expectedOutput.add(new StreamRecord<String>("+2", initialTime + 1));
+		expectedOutput.add(new StreamRecord<String>("+3", initialTime + 2));
+		expectedOutput.add(new Watermark(initialTime + 2));
+		expectedOutput.add(new StreamRecord<String>("+4", initialTime + 3));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+	}
+
+	@Test
+	public void testOpenClose() throws Exception {
+		StreamMap<String, String> operator = new StreamMap<String, String>(new TestOpenCloseMapFunction());
+
+		OneInputStreamOperatorTestHarness<String, String> testHarness = new OneInputStreamOperatorTestHarness<String, String>(operator);
+
+		long initialTime = 0L;
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<String>("Hello", initialTime));
+
+		testHarness.close();
+
+		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
+		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
+	}
+
+	// This must only be used in one test, otherwise the static fields will be changed
+	// by several tests concurrently
+	private static class TestOpenCloseMapFunction extends RichMapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		public static boolean openCalled = false;
+		public static boolean closeCalled = false;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			if (closeCalled) {
+				Assert.fail("Close called before open.");
+			}
+			openCalled = true;
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			if (!openCalled) {
+				Assert.fail("Open was not called before close.");
+			}
+			closeCalled = true;
+		}
+
+		@Override
+		public String map(String value) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called before run.");
+			}
+			return value;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
new file mode 100644
index 0000000..ede7db5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.streaming.api.datastream.StreamProjection;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/**
+ * Tests for {@link StreamProject}. These test that:
+ *
+ * <ul>
+ *     <li>Timestamps of processed elements match the input timestamp</li>
+ *     <li>Watermarks are correctly forwarded</li>
+ * </ul>
+ */
+public class StreamProjectTest implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	@Test
+	public void testProject() throws Exception {
+
+		TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor
+				.getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
+
+		int[] fields = new int[]{4, 4, 3};
+
+		TupleSerializer<Tuple3<Integer, Integer, String>> serializer =
+				new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection.extractFieldTypes(fields, inType))
+						.createSerializer(new ExecutionConfig());
+		@SuppressWarnings("unchecked")
+		StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator =
+				new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
+						fields, serializer);
+
+		OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> testHarness = new OneInputStreamOperatorTestHarness<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4), initialTime + 1));
+		testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "s", 3, "c", 2), initialTime + 2));
+		testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "c", 2), initialTime + 3));
+		testHarness.processWatermark(new Watermark(initialTime + 2));
+		testHarness.processElement(new StreamRecord<Tuple5<Integer, String, Integer, String, Integer>>(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "a", 7), initialTime + 4));
+
+		expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(4, 4, "b"), initialTime + 1));
+		expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(2, 2, "c"), initialTime + 2));
+		expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(2, 2, "c"), initialTime + 3));
+		expectedOutput.add(new Watermark(initialTime + 2));
+		expectedOutput.add(new StreamRecord<Tuple3<Integer, Integer, String>>(new Tuple3<Integer, Integer, String>(7, 7, "a"), initialTime + 4));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+	}
+
+
+	// tests using projection from the API without explicitly specifying the types
+	private static final long MEMORY_SIZE = 32;
+	private static HashSet<Tuple2<Long, Double>> expected = new HashSet<Tuple2<Long, Double>>();
+	private static HashSet<Tuple2<Long, Double>> actual = new HashSet<Tuple2<Long, Double>>();
+
+	@Test
+	public void APIWithoutTypesTest() {
+
+		for (Long i = 1L; i < 11L; i++) {
+			expected.add(new Tuple2<Long, Double>(i, i.doubleValue()));
+		}
+
+		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
+
+		env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
+				@Override
+				public Tuple3<Long, Character, Double> map(Long value) throws Exception {
+					return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
+				}
+			})
+			.project(0, 2)
+			.addSink(new SinkFunction<Tuple>() {
+				@Override
+				@SuppressWarnings("unchecked")
+				public void invoke(Tuple value) throws Exception {
+					actual.add( (Tuple2<Long,Double>) value);
+				}
+			});
+
+		try {
+			env.execute();
+		} catch (Exception e) {
+			fail(e.getMessage());
+		}
+
+		assertEquals(expected, actual);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java
deleted file mode 100644
index 7f23e23..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoFlatMapTest.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class CoFlatMapTest implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap1(String value, Collector<String> coll) {
-			for (int i = 0; i < value.length(); i++) {
-				coll.collect(value.substring(i, i + 1));
-			}
-		}
-
-		@Override
-		public void flatMap2(Integer value, Collector<String> coll) {
-			coll.collect(value.toString());
-		}
-	}
-
-	@Test
-	public void coFlatMapTest() {
-		CoStreamFlatMap<String, Integer, String> invokable = new CoStreamFlatMap<String, Integer, String>(
-				new MyCoFlatMap());
-
-		List<String> expectedList = Arrays.asList("a", "b", "c", "1", "d", "e", "f", "2", "g", "h",
-				"e", "3", "4", "5");
-		List<String> actualList = MockCoContext.createAndExecute(invokable,
-				Arrays.asList("abc", "def", "ghe"), Arrays.asList(1, 2, 3, 4, 5));
-
-		assertEquals(expectedList, actualList);
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void multipleInputTest() {
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		DataStream<Integer> ds1 = env.fromElements(1, 3, 5);
-		DataStream<Integer> ds2 = env.fromElements(2, 4).union(ds1);
-		
-		try {
-			ds1.forward().union(ds2);
-			fail();
-		} catch (RuntimeException e) {
-			// expected
-		}
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java
index d01d0d3..39e85e9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoGroupedReduceTest.java
@@ -1,125 +1,125 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
-import org.apache.flink.streaming.api.operators.co.CoStreamGroupedReduce;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoGroupedReduceTest {
-
-	private final static class MyCoReduceFunction implements
-			CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1,
-				Tuple3<String, String, String> value2) {
-			return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2);
-		}
-
-		@Override
-		public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1,
-				Tuple2<Integer, Integer> value2) {
-			return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
-		}
-
-		@Override
-		public String map1(Tuple3<String, String, String> value) {
-			return value.f1;
-		}
-
-		@Override
-		public String map2(Tuple2<Integer, Integer> value) {
-			return value.f1.toString();
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	@Test
-	public void coGroupedReduceTest() {
-		Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
-		Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
-		Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
-		Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
-		Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
-		Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
-		Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
-		Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
-
-		KeySelector<Tuple3<String, String, String>, ?> keySelector0 = new KeySelector<Tuple3<String, String, String>, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String getKey(Tuple3<String, String, String> value) throws Exception {
-				return value.f0;
-			}
-		};
-
-		KeySelector<Tuple2<Integer, Integer>, ?> keySelector1 = new KeySelector<Tuple2<Integer, Integer>, Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
-				return value.f0;
-			}
-		};
-
-		KeySelector<Tuple3<String, String, String>, ?> keySelector2 = new KeySelector<Tuple3<String, String, String>, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String getKey(Tuple3<String, String, String> value) throws Exception {
-				return value.f2;
-			}
-		};
-
-		CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
-				new MyCoReduceFunction(), keySelector0, keySelector1);
-
-		List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
-				"7");
-
-		List<String> actualList = MockCoContext.createAndExecute(invokable,
-				Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
-
-		assertEquals(expected, actualList);
-
-		invokable = new CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
-				new MyCoReduceFunction(), keySelector2, keySelector1);
-
-		expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7");
-
-		actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3),
-				Arrays.asList(int1, int2, int3, int4, int5));
-
-		assertEquals(expected, actualList);
-	}
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements.  See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License.  You may obtain a copy of the License at
+// *
+// *    http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.streaming.api.operators.co;
+//
+//import static org.junit.Assert.assertEquals;
+//
+//import java.util.Arrays;
+//import java.util.List;
+//
+//import org.apache.flink.api.java.functions.KeySelector;
+//import org.apache.flink.api.java.tuple.Tuple2;
+//import org.apache.flink.api.java.tuple.Tuple3;
+//import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
+//import org.apache.flink.streaming.api.operators.co.CoStreamGroupedReduce;
+//import org.apache.flink.streaming.util.MockCoContext;
+//import org.junit.Test;
+//
+//public class CoGroupedReduceTest {
+//
+//	private final static class MyCoReduceFunction implements
+//			CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> {
+//		private static final long serialVersionUID = 1L;
+//
+//		@Override
+//		public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1,
+//				Tuple3<String, String, String> value2) {
+//			return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2);
+//		}
+//
+//		@Override
+//		public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1,
+//				Tuple2<Integer, Integer> value2) {
+//			return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
+//		}
+//
+//		@Override
+//		public String map1(Tuple3<String, String, String> value) {
+//			return value.f1;
+//		}
+//
+//		@Override
+//		public String map2(Tuple2<Integer, Integer> value) {
+//			return value.f1.toString();
+//		}
+//	}
+//
+//	@SuppressWarnings("unchecked")
+//	@Test
+//	public void coGroupedReduceTest() {
+//		Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
+//		Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
+//		Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
+//		Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
+//		Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
+//		Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
+//		Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
+//		Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
+//
+//		KeySelector<Tuple3<String, String, String>, ?> keySelector0 = new KeySelector<Tuple3<String, String, String>, String>() {
+//
+//			private static final long serialVersionUID = 1L;
+//
+//			@Override
+//			public String getKey(Tuple3<String, String, String> value) throws Exception {
+//				return value.f0;
+//			}
+//		};
+//
+//		KeySelector<Tuple2<Integer, Integer>, ?> keySelector1 = new KeySelector<Tuple2<Integer, Integer>, Integer>() {
+//
+//			private static final long serialVersionUID = 1L;
+//
+//			@Override
+//			public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
+//				return value.f0;
+//			}
+//		};
+//
+//		KeySelector<Tuple3<String, String, String>, ?> keySelector2 = new KeySelector<Tuple3<String, String, String>, String>() {
+//
+//			private static final long serialVersionUID = 1L;
+//
+//			@Override
+//			public String getKey(Tuple3<String, String, String> value) throws Exception {
+//				return value.f2;
+//			}
+//		};
+//
+//		CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
+//				new MyCoReduceFunction(), keySelector0, keySelector1);
+//
+//		List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
+//				"7");
+//
+//		List<String> actualList = MockCoContext.createAndExecute(invokable,
+//				Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
+//
+//		assertEquals(expected, actualList);
+//
+//		invokable = new CoStreamGroupedReduce<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
+//				new MyCoReduceFunction(), keySelector2, keySelector1);
+//
+//		expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7");
+//
+//		actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3),
+//				Arrays.asList(int1, int2, int3, int4, int5));
+//
+//		assertEquals(expected, actualList);
+//	}
+//}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoMapTest.java
deleted file mode 100644
index 2a2560d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoMapTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoMapTest implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private final static class MyCoMap implements CoMapFunction<Double, Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map1(Double value) {
-			return value.toString();
-		}
-
-		@Override
-		public String map2(Integer value) {
-			return value.toString();
-		}
-	}
-
-	@Test
-	public void coMapTest() {
-		CoStreamMap<Double, Integer, String> invokable = new CoStreamMap<Double, Integer, String>(new MyCoMap());
-
-		List<String> expectedList = Arrays.asList("1.1", "1", "1.2", "2", "1.3", "3", "1.4", "1.5");
-		List<String> actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3));
-		
-		assertEquals(expectedList, actualList);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
new file mode 100644
index 0000000..2c9ba5c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import static org.junit.Assert.fail;
+
+import java.io.Serializable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link CoStreamFlatMap}. These test that:
+ *
+ * <ul>
+ *     <li>RichFunction methods are called correctly</li>
+ *     <li>Timestamps of processed elements match the input timestamp</li>
+ *     <li>Watermarks are correctly forwarded</li>
+ * </ul>
+ */
+public class CoStreamFlatMapTest implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap1(String value, Collector<String> coll) {
+			for (int i = 0; i < value.length(); i++) {
+				coll.collect(value.substring(i, i + 1));
+			}
+		}
+
+		@Override
+		public void flatMap2(Integer value, Collector<String> coll) {
+			coll.collect(value.toString());
+		}
+	}
+
+	@Test
+	public void testCoFlatMap() throws Exception {
+		CoStreamFlatMap<String, Integer, String> operator = new CoStreamFlatMap<String, Integer, String>(new MyCoFlatMap());
+
+		TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<String, Integer, String>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+
+		testHarness.open();
+
+		testHarness.processElement1(new StreamRecord<String>("abc", initialTime + 1));
+		testHarness.processElement1(new StreamRecord<String>("def", initialTime + 2));
+		testHarness.processWatermark1(new Watermark(initialTime + 2));
+		testHarness.processElement1(new StreamRecord<String>("ghi", initialTime + 3));
+
+		testHarness.processElement2(new StreamRecord<Integer>(1, initialTime + 1));
+		testHarness.processElement2(new StreamRecord<Integer>(2, initialTime + 2));
+		testHarness.processWatermark2(new Watermark(initialTime + 3));
+		testHarness.processElement2(new StreamRecord<Integer>(3, initialTime + 3));
+		testHarness.processElement2(new StreamRecord<Integer>(4, initialTime + 4));
+		testHarness.processElement2(new StreamRecord<Integer>(5, initialTime + 5));
+
+		expectedOutput.add(new StreamRecord<String>("a", initialTime + 1));
+		expectedOutput.add(new StreamRecord<String>("b", initialTime + 1));
+		expectedOutput.add(new StreamRecord<String>("c", initialTime + 1));
+		expectedOutput.add(new StreamRecord<String>("d", initialTime + 2));
+		expectedOutput.add(new StreamRecord<String>("e", initialTime + 2));
+		expectedOutput.add(new StreamRecord<String>("f", initialTime + 2));
+		expectedOutput.add(new StreamRecord<String>("g", initialTime + 3));
+		expectedOutput.add(new StreamRecord<String>("h", initialTime + 3));
+		expectedOutput.add(new StreamRecord<String>("i", initialTime + 3));
+
+		expectedOutput.add(new StreamRecord<String>("1", initialTime + 1));
+		expectedOutput.add(new StreamRecord<String>("2", initialTime + 2));
+		expectedOutput.add(new Watermark(initialTime + 2));
+		expectedOutput.add(new StreamRecord<String>("3", initialTime + 3));
+		expectedOutput.add(new StreamRecord<String>("4", initialTime + 4));
+		expectedOutput.add(new StreamRecord<String>("5", initialTime + 5));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+	}
+
+	@Test
+	public void testOpenClose() throws Exception {
+		CoStreamFlatMap<String, Integer, String> operator = new CoStreamFlatMap<String, Integer, String>(new TestOpenCloseCoFlatMapFunction());
+
+		TwoInputStreamOperatorTestHarness<String, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<String, Integer, String>(operator);
+
+		long initialTime = 0L;
+
+		testHarness.open();
+
+		testHarness.processElement1(new StreamRecord<String>("Hello", initialTime));
+		testHarness.processElement2(new StreamRecord<Integer>(42, initialTime));
+
+		testHarness.close();
+
+		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseCoFlatMapFunction.closeCalled);
+		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
+	}
+
+	// This must only be used in one test, otherwise the static fields will be changed
+	// by several tests concurrently
+	private static class TestOpenCloseCoFlatMapFunction extends RichCoFlatMapFunction<String, Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		public static boolean openCalled = false;
+		public static boolean closeCalled = false;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			if (closeCalled) {
+				Assert.fail("Close called before open.");
+			}
+			openCalled = true;
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			if (!openCalled) {
+				Assert.fail("Open was not called before close.");
+			}
+			closeCalled = true;
+		}
+
+		@Override
+		public void flatMap1(String value, Collector<String> out) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called before run.");
+			}
+			out.collect(value);
+		}
+
+		@Override
+		public void flatMap2(Integer value, Collector<String> out) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called before run.");
+			}
+			out.collect(value.toString());
+		}
+
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void multipleInputTest() {
+		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		DataStream<Integer> ds1 = env.fromElements(1, 3, 5);
+		DataStream<Integer> ds2 = env.fromElements(2, 4).union(ds1);
+		
+		try {
+			ds1.forward().union(ds2);
+			fail();
+		} catch (RuntimeException e) {
+			// expected
+		}
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
new file mode 100644
index 0000000..dcf4972
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamMapTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link org.apache.flink.streaming.api.operators.co.CoStreamMap}. These test that:
+ *
+ * <ul>
+ *     <li>RichFunction methods are called correctly</li>
+ *     <li>Timestamps of processed elements match the input timestamp</li>
+ *     <li>Watermarks are correctly forwarded</li>
+ * </ul>
+ */
+public class CoStreamMapTest implements Serializable {
+	private static final long serialVersionUID = 1L;
+
+	private final static class MyCoMap implements CoMapFunction<Double, Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String map1(Double value) {
+			return value.toString();
+		}
+
+		@Override
+		public String map2(Integer value) {
+			return value.toString();
+		}
+	}
+
+
+	@Test
+	public void testCoMap() throws Exception {
+		CoStreamMap<Double, Integer, String> operator = new CoStreamMap<Double, Integer, String>(new MyCoMap());
+
+		TwoInputStreamOperatorTestHarness<Double, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<Double, Integer, String>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+
+		testHarness.open();
+
+		testHarness.processElement1(new StreamRecord<Double>(1.1d, initialTime + 1));
+		testHarness.processElement1(new StreamRecord<Double>(1.2d, initialTime + 2));
+		testHarness.processElement1(new StreamRecord<Double>(1.3d, initialTime + 3));
+		testHarness.processWatermark1(new Watermark(initialTime + 3));
+		testHarness.processElement1(new StreamRecord<Double>(1.4d, initialTime + 4));
+		testHarness.processElement1(new StreamRecord<Double>(1.5d, initialTime + 5));
+
+		testHarness.processElement2(new StreamRecord<Integer>(1, initialTime + 1));
+		testHarness.processElement2(new StreamRecord<Integer>(2, initialTime + 2));
+		testHarness.processWatermark2(new Watermark(initialTime + 2));
+		testHarness.processElement2(new StreamRecord<Integer>(3, initialTime + 3));
+		testHarness.processElement2(new StreamRecord<Integer>(4, initialTime + 4));
+		testHarness.processElement2(new StreamRecord<Integer>(5, initialTime + 5));
+
+		expectedOutput.add(new StreamRecord<String>("1.1", initialTime + 1));
+		expectedOutput.add(new StreamRecord<String>("1.2", initialTime + 2));
+		expectedOutput.add(new StreamRecord<String>("1.3", initialTime + 3));
+		expectedOutput.add(new StreamRecord<String>("1.4", initialTime + 4));
+		expectedOutput.add(new StreamRecord<String>("1.5", initialTime + 5));
+
+		expectedOutput.add(new StreamRecord<String>("1", initialTime + 1));
+		expectedOutput.add(new StreamRecord<String>("2", initialTime + 2));
+		expectedOutput.add(new Watermark(initialTime + 2));
+		expectedOutput.add(new StreamRecord<String>("3", initialTime + 3));
+		expectedOutput.add(new StreamRecord<String>("4", initialTime + 4));
+		expectedOutput.add(new StreamRecord<String>("5", initialTime + 5));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+	}
+
+	@Test
+	public void testOpenClose() throws Exception {
+		CoStreamMap<Double, Integer, String> operator = new CoStreamMap<Double, Integer, String>(new TestOpenCloseCoMapFunction());
+
+		TwoInputStreamOperatorTestHarness<Double, Integer, String> testHarness = new TwoInputStreamOperatorTestHarness<Double, Integer, String>(operator);
+
+		long initialTime = 0L;
+
+		testHarness.open();
+
+		testHarness.processElement1(new StreamRecord<Double>(74d, initialTime));
+		testHarness.processElement2(new StreamRecord<Integer>(42, initialTime));
+
+		testHarness.close();
+
+		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseCoMapFunction.closeCalled);
+		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
+	}
+
+	// This must only be used in one test, otherwise the static fields will be changed
+	// by several tests concurrently
+	private static class TestOpenCloseCoMapFunction extends RichCoMapFunction<Double, Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		public static boolean openCalled = false;
+		public static boolean closeCalled = false;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			if (closeCalled) {
+				Assert.fail("Close called before open.");
+			}
+			openCalled = true;
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			if (!openCalled) {
+				Assert.fail("Open was not called before close.");
+			}
+			closeCalled = true;
+		}
+
+		@Override
+		public String map1(Double value) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called before run.");
+			}
+			return value.toString();
+		}
+
+		@Override
+		public String map2(Integer value) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called before run.");
+			}
+			return value.toString();
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java
index c0f49c7..130842e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoWindowTest.java
@@ -1,182 +1,182 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
-import org.apache.flink.streaming.api.operators.co.CoStreamWindow;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class CoWindowTest {
-
-	public static final class MyCoGroup1 implements CoWindowFunction<Integer, Integer, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@SuppressWarnings("unused")
-		@Override
-		public void coWindow(List<Integer> first, List<Integer> second, Collector<Integer> out)
-				throws Exception {
-			Integer count1 = 0;
-			for (Integer i : first) {
-				count1++;
-			}
-			Integer count2 = 0;
-			for (Integer i : second) {
-				count2++;
-			}
-			out.collect(count1);
-			out.collect(count2);
-
-		}
-
-	}
-
-	public static final class MyCoGroup2 implements
-			CoWindowFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void coWindow(List<Tuple2<Integer, Integer>> first,
-				List<Tuple2<Integer, Integer>> second, Collector<Integer> out) throws Exception {
-
-			Set<Integer> firstElements = new HashSet<Integer>();
-			for (Tuple2<Integer, Integer> value : first) {
-				firstElements.add(value.f1);
-			}
-			for (Tuple2<Integer, Integer> value : second) {
-				if (firstElements.contains(value.f1)) {
-					out.collect(value.f1);
-				}
-			}
-
-		}
-
-	}
-
-	private static final class MyTS1 implements Timestamp<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long getTimestamp(Integer value) {
-			return value;
-		}
-
-	}
-
-	private static final class MyTS2 implements Timestamp<Tuple2<Integer, Integer>> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long getTimestamp(Tuple2<Integer, Integer> value) {
-			return value.f0;
-		}
-
-	}
-
-	@Test
-	public void coWindowGroupReduceTest2() throws Exception {
-
-		CoStreamWindow<Integer, Integer, Integer> invokable1 = new CoStreamWindow<Integer, Integer, Integer>(
-				new MyCoGroup1(), 2, 1, new TimestampWrapper<Integer>(new MyTS1(), 1),
-				new TimestampWrapper<Integer>(new MyTS1(), 1));
-
-		// Windowsize 2, slide 1
-		// 1,2|2,3|3,4|4,5
-
-		List<Integer> input11 = new ArrayList<Integer>();
-		input11.add(1);
-		input11.add(1);
-		input11.add(2);
-		input11.add(3);
-		input11.add(3);
-
-		List<Integer> input12 = new ArrayList<Integer>();
-		input12.add(1);
-		input12.add(2);
-		input12.add(3);
-		input12.add(3);
-		input12.add(5);
-
-		// Windows: (1,1,2)(1,1,2)|(2,3,3)(2,3,3)|(3,3)(3,3)|(5)(5)
-		// expected output: 3,2|3,3|2,2|0,1
-
-		List<Integer> expected1 = new ArrayList<Integer>();
-		expected1.add(3);
-		expected1.add(2);
-		expected1.add(3);
-		expected1.add(3);
-		expected1.add(2);
-		expected1.add(2);
-		expected1.add(0);
-		expected1.add(1);
-
-		List<Integer> actual1 = MockCoContext.createAndExecute(invokable1, input11, input12);
-		assertEquals(expected1, actual1);
-
-		CoStreamWindow<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoStreamWindow<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>(
-				new MyCoGroup2(), 2, 3, new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(),
-						1), new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1));
-
-		// WindowSize 2, slide 3
-		// 1,2|4,5|7,8|
-
-		List<Tuple2<Integer, Integer>> input21 = new ArrayList<Tuple2<Integer, Integer>>();
-		input21.add(new Tuple2<Integer, Integer>(1, 1));
-		input21.add(new Tuple2<Integer, Integer>(1, 2));
-		input21.add(new Tuple2<Integer, Integer>(2, 3));
-		input21.add(new Tuple2<Integer, Integer>(3, 4));
-		input21.add(new Tuple2<Integer, Integer>(3, 5));
-		input21.add(new Tuple2<Integer, Integer>(4, 6));
-		input21.add(new Tuple2<Integer, Integer>(4, 7));
-		input21.add(new Tuple2<Integer, Integer>(5, 8));
-
-		List<Tuple2<Integer, Integer>> input22 = new ArrayList<Tuple2<Integer, Integer>>();
-		input22.add(new Tuple2<Integer, Integer>(1, 1));
-		input22.add(new Tuple2<Integer, Integer>(2, 0));
-		input22.add(new Tuple2<Integer, Integer>(2, 2));
-		input22.add(new Tuple2<Integer, Integer>(3, 9));
-		input22.add(new Tuple2<Integer, Integer>(3, 4));
-		input22.add(new Tuple2<Integer, Integer>(4, 10));
-		input22.add(new Tuple2<Integer, Integer>(5, 8));
-		input22.add(new Tuple2<Integer, Integer>(5, 7));
-
-		List<Integer> expected2 = new ArrayList<Integer>();
-		expected2.add(1);
-		expected2.add(2);
-		expected2.add(8);
-		expected2.add(7);
-
-		List<Integer> actual2 = MockCoContext.createAndExecute(invokable2, input21, input22);
-		assertEquals(expected2, actual2);
-	}
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one or more
+// * contributor license agreements.  See the NOTICE file distributed with
+// * this work for additional information regarding copyright ownership.
+// * The ASF licenses this file to You under the Apache License, Version 2.0
+// * (the "License"); you may not use this file except in compliance with
+// * the License.  You may obtain a copy of the License at
+// *
+// *    http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.streaming.api.operators.co;
+//
+//import static org.junit.Assert.assertEquals;
+//
+//import java.util.ArrayList;
+//import java.util.HashSet;
+//import java.util.List;
+//import java.util.Set;
+//
+//import org.apache.flink.api.java.tuple.Tuple2;
+//import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
+//import org.apache.flink.streaming.api.operators.co.CoStreamWindow;
+//import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+//import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+//import org.apache.flink.streaming.util.MockCoContext;
+//import org.apache.flink.util.Collector;
+//import org.junit.Test;
+//
+//public class CoWindowTest {
+//
+//	public static final class MyCoGroup1 implements CoWindowFunction<Integer, Integer, Integer> {
+//
+//		private static final long serialVersionUID = 1L;
+//
+//		@SuppressWarnings("unused")
+//		@Override
+//		public void coWindow(List<Integer> first, List<Integer> second, Collector<Integer> out)
+//				throws Exception {
+//			Integer count1 = 0;
+//			for (Integer i : first) {
+//				count1++;
+//			}
+//			Integer count2 = 0;
+//			for (Integer i : second) {
+//				count2++;
+//			}
+//			out.collect(count1);
+//			out.collect(count2);
+//
+//		}
+//
+//	}
+//
+//	public static final class MyCoGroup2 implements
+//			CoWindowFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> {
+//
+//		private static final long serialVersionUID = 1L;
+//
+//		@Override
+//		public void coWindow(List<Tuple2<Integer, Integer>> first,
+//				List<Tuple2<Integer, Integer>> second, Collector<Integer> out) throws Exception {
+//
+//			Set<Integer> firstElements = new HashSet<Integer>();
+//			for (Tuple2<Integer, Integer> value : first) {
+//				firstElements.add(value.f1);
+//			}
+//			for (Tuple2<Integer, Integer> value : second) {
+//				if (firstElements.contains(value.f1)) {
+//					out.collect(value.f1);
+//				}
+//			}
+//
+//		}
+//
+//	}
+//
+//	private static final class MyTS1 implements Timestamp<Integer> {
+//
+//		private static final long serialVersionUID = 1L;
+//
+//		@Override
+//		public long getTimestamp(Integer value) {
+//			return value;
+//		}
+//
+//	}
+//
+//	private static final class MyTS2 implements Timestamp<Tuple2<Integer, Integer>> {
+//
+//		private static final long serialVersionUID = 1L;
+//
+//		@Override
+//		public long getTimestamp(Tuple2<Integer, Integer> value) {
+//			return value.f0;
+//		}
+//
+//	}
+//
+//	@Test
+//	public void coWindowGroupReduceTest2() throws Exception {
+//
+//		CoStreamWindow<Integer, Integer, Integer> invokable1 = new CoStreamWindow<Integer, Integer, Integer>(
+//				new MyCoGroup1(), 2, 1, new TimestampWrapper<Integer>(new MyTS1(), 1),
+//				new TimestampWrapper<Integer>(new MyTS1(), 1));
+//
+//		// Windowsize 2, slide 1
+//		// 1,2|2,3|3,4|4,5
+//
+//		List<Integer> input11 = new ArrayList<Integer>();
+//		input11.add(1);
+//		input11.add(1);
+//		input11.add(2);
+//		input11.add(3);
+//		input11.add(3);
+//
+//		List<Integer> input12 = new ArrayList<Integer>();
+//		input12.add(1);
+//		input12.add(2);
+//		input12.add(3);
+//		input12.add(3);
+//		input12.add(5);
+//
+//		// Windows: (1,1,2)(1,1,2)|(2,3,3)(2,3,3)|(3,3)(3,3)|(5)(5)
+//		// expected output: 3,2|3,3|2,2|0,1
+//
+//		List<Integer> expected1 = new ArrayList<Integer>();
+//		expected1.add(3);
+//		expected1.add(2);
+//		expected1.add(3);
+//		expected1.add(3);
+//		expected1.add(2);
+//		expected1.add(2);
+//		expected1.add(0);
+//		expected1.add(1);
+//
+//		List<Integer> actual1 = MockCoContext.createAndExecute(invokable1, input11, input12);
+//		assertEquals(expected1, actual1);
+//
+//		CoStreamWindow<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoStreamWindow<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>(
+//				new MyCoGroup2(), 2, 3, new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(),
+//						1), new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1));
+//
+//		// WindowSize 2, slide 3
+//		// 1,2|4,5|7,8|
+//
+//		List<Tuple2<Integer, Integer>> input21 = new ArrayList<Tuple2<Integer, Integer>>();
+//		input21.add(new Tuple2<Integer, Integer>(1, 1));
+//		input21.add(new Tuple2<Integer, Integer>(1, 2));
+//		input21.add(new Tuple2<Integer, Integer>(2, 3));
+//		input21.add(new Tuple2<Integer, Integer>(3, 4));
+//		input21.add(new Tuple2<Integer, Integer>(3, 5));
+//		input21.add(new Tuple2<Integer, Integer>(4, 6));
+//		input21.add(new Tuple2<Integer, Integer>(4, 7));
+//		input21.add(new Tuple2<Integer, Integer>(5, 8));
+//
+//		List<Tuple2<Integer, Integer>> input22 = new ArrayList<Tuple2<Integer, Integer>>();
+//		input22.add(new Tuple2<Integer, Integer>(1, 1));
+//		input22.add(new Tuple2<Integer, Integer>(2, 0));
+//		input22.add(new Tuple2<Integer, Integer>(2, 2));
+//		input22.add(new Tuple2<Integer, Integer>(3, 9));
+//		input22.add(new Tuple2<Integer, Integer>(3, 4));
+//		input22.add(new Tuple2<Integer, Integer>(4, 10));
+//		input22.add(new Tuple2<Integer, Integer>(5, 8));
+//		input22.add(new Tuple2<Integer, Integer>(5, 7));
+//
+//		List<Integer> expected2 = new ArrayList<Integer>();
+//		expected2.add(1);
+//		expected2.add(2);
+//		expected2.add(8);
+//		expected2.add(7);
+//
+//		List<Integer> actual2 = MockCoContext.createAndExecute(invokable2, input21, input22);
+//		assertEquals(expected2, actual2);
+//	}
+//}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java
index c8b0ae3..f111890 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/ParallelMergeTest.java
@@ -25,10 +25,9 @@ import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.operators.windowing.ParallelGroupedMerge;
-import org.apache.flink.streaming.api.operators.windowing.ParallelMerge;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
-import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestCollector;
+import org.apache.flink.streaming.api.windowing.windowbuffer.BasicWindowBufferTest.TestOutput;
 import org.junit.Test;
 
 public class ParallelMergeTest {
@@ -45,37 +44,38 @@ public class ParallelMergeTest {
 			}
 		};
 
-		TestCollector<StreamWindow<Integer>> out = new TestCollector<StreamWindow<Integer>>();
-		List<StreamWindow<Integer>> output = out.getCollected();
+		TestOutput<StreamWindow<Integer>> output = new TestOutput<StreamWindow<Integer>>();
+		TimestampedCollector<StreamWindow<Integer>> collector = new TimestampedCollector<StreamWindow<Integer>>(output);
+		List<StreamWindow<Integer>> result = output.getCollected();
 
 		ParallelMerge<Integer> merger = new ParallelMerge<Integer>(reducer);
 		merger.numberOfDiscretizers = 2;
 
-		merger.flatMap1(createTestWindow(1), out);
-		merger.flatMap1(createTestWindow(1), out);
-		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
-		assertTrue(output.isEmpty());
-		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
-		assertEquals(StreamWindow.fromElements(2), output.get(0));
-
-		merger.flatMap2(new Tuple2<Integer, Integer>(2, 2), out);
-		merger.flatMap1(createTestWindow(2), out);
-		merger.flatMap1(createTestWindow(2), out);
-		merger.flatMap2(new Tuple2<Integer, Integer>(2, 1), out);
-		assertEquals(1, output.size());
-		merger.flatMap1(createTestWindow(2), out);
-		assertEquals(StreamWindow.fromElements(3), output.get(1));
+		merger.flatMap1(createTestWindow(1), collector);
+		merger.flatMap1(createTestWindow(1), collector);
+		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
+		assertTrue(result.isEmpty());
+		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
+		assertEquals(StreamWindow.fromElements(2), result.get(0));
+
+		merger.flatMap2(new Tuple2<Integer, Integer>(2, 2), collector);
+		merger.flatMap1(createTestWindow(2), collector);
+		merger.flatMap1(createTestWindow(2), collector);
+		merger.flatMap2(new Tuple2<Integer, Integer>(2, 1), collector);
+		assertEquals(1, result.size());
+		merger.flatMap1(createTestWindow(2), collector);
+		assertEquals(StreamWindow.fromElements(3), result.get(1));
 
 		// check error handling
-		merger.flatMap1(createTestWindow(3), out);
-		merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), out);
-		merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), out);
+		merger.flatMap1(createTestWindow(3), collector);
+		merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), collector);
+		merger.flatMap2(new Tuple2<Integer, Integer>(3, 1), collector);
 
-		merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), out);
-		merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), out);
-		merger.flatMap1(createTestWindow(4), out);
+		merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), collector);
+		merger.flatMap2(new Tuple2<Integer, Integer>(4, 1), collector);
+		merger.flatMap1(createTestWindow(4), collector);
 		try {
-			merger.flatMap1(createTestWindow(4), out);
+			merger.flatMap1(createTestWindow(4), collector);
 			fail();
 		} catch (RuntimeException e) {
 			// Do nothing
@@ -83,12 +83,12 @@ public class ParallelMergeTest {
 
 		ParallelMerge<Integer> merger2 = new ParallelMerge<Integer>(reducer);
 		merger2.numberOfDiscretizers = 2;
-		merger2.flatMap1(createTestWindow(0), out);
-		merger2.flatMap1(createTestWindow(1), out);
-		merger2.flatMap1(createTestWindow(1), out);
-		merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
+		merger2.flatMap1(createTestWindow(0), collector);
+		merger2.flatMap1(createTestWindow(1), collector);
+		merger2.flatMap1(createTestWindow(1), collector);
+		merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
 		try {
-			merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
+			merger2.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
 			fail();
 		} catch (RuntimeException e) {
 			// Do nothing
@@ -99,18 +99,19 @@ public class ParallelMergeTest {
 	@Test
 	public void groupedTest() throws Exception {
 
-		TestCollector<StreamWindow<Integer>> out = new TestCollector<StreamWindow<Integer>>();
-		List<StreamWindow<Integer>> output = out.getCollected();
+		TestOutput<StreamWindow<Integer>> output = new TestOutput<StreamWindow<Integer>>();
+		TimestampedCollector<StreamWindow<Integer>> collector = new TimestampedCollector<StreamWindow<Integer>>(output);
+		List<StreamWindow<Integer>> result = output.getCollected();
 
 		ParallelMerge<Integer> merger = new ParallelGroupedMerge<Integer>();
 		merger.numberOfDiscretizers = 2;
 
-		merger.flatMap1(createTestWindow(1), out);
-		merger.flatMap1(createTestWindow(1), out);
-		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
-		assertTrue(output.isEmpty());
-		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), out);
-		assertEquals(StreamWindow.fromElements(1, 1), output.get(0));
+		merger.flatMap1(createTestWindow(1), collector);
+		merger.flatMap1(createTestWindow(1), collector);
+		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
+		assertTrue(result.isEmpty());
+		merger.flatMap2(new Tuple2<Integer, Integer>(1, 1), collector);
+		assertEquals(StreamWindow.fromElements(1, 1), result.get(0));
 	}
 
 	private StreamWindow<Integer> createTestWindow(Integer id) {