You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/28 17:45:10 UTC

[1/2] flink git commit: [FLINK-5026] Rename TimelyFlatMap to Process

Repository: flink
Updated Branches:
  refs/heads/master 3a27f55cf -> 910f733f5


http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
new file mode 100644
index 0000000..74fd044
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.functions.RichProcessFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link ProcessOperator}.
+ */
+public class ProcessOperatorTest extends TestLogger {
+
+	@Test
+	public void testTimestampAndWatermarkQuerying() throws Exception {
+
+		ProcessOperator<Integer, Integer, String> operator =
+				new ProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(17));
+		testHarness.processElement(new StreamRecord<>(5, 12L));
+
+		testHarness.processWatermark(new Watermark(42));
+		testHarness.processElement(new StreamRecord<>(6, 13L));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(17L));
+		expectedOutput.add(new StreamRecord<>("5TIME:17 TS:12", 12L));
+		expectedOutput.add(new Watermark(42L));
+		expectedOutput.add(new StreamRecord<>("6TIME:42 TS:13", 13L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testTimestampAndProcessingTimeQuerying() throws Exception {
+
+		ProcessOperator<Integer, Integer, String> operator =
+				new ProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(17);
+		testHarness.processElement(new StreamRecord<>(5));
+
+		testHarness.setProcessingTime(42);
+		testHarness.processElement(new StreamRecord<>(6));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("5TIME:17 TS:null"));
+		expectedOutput.add(new StreamRecord<>("6TIME:42 TS:null"));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testEventTimeTimers() throws Exception {
+
+		ProcessOperator<Integer, Integer, Integer> operator =
+				new ProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(0));
+
+		testHarness.processElement(new StreamRecord<>(17, 42L));
+
+		testHarness.processWatermark(new Watermark(5));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(0L));
+		expectedOutput.add(new StreamRecord<>(17, 42L));
+		expectedOutput.add(new StreamRecord<>(1777, 5L));
+		expectedOutput.add(new Watermark(5L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testProcessingTimeTimers() throws Exception {
+
+		ProcessOperator<Integer, Integer, Integer> operator =
+				new ProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(17));
+
+		testHarness.setProcessingTime(5);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>(17));
+		expectedOutput.add(new StreamRecord<>(1777, 5L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	/**
+	 * Verifies that we don't have leakage between different keys.
+	 */
+	@Test
+	public void testEventTimeTimerWithState() throws Exception {
+
+		ProcessOperator<Integer, Integer, String> operator =
+				new ProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(1));
+		testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
+
+		testHarness.processWatermark(new Watermark(2));
+		testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
+
+		testHarness.processWatermark(new Watermark(6));
+		testHarness.processWatermark(new Watermark(7));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(1L));
+		expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
+		expectedOutput.add(new Watermark(2L));
+		expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
+		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+		expectedOutput.add(new Watermark(6L));
+		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+		expectedOutput.add(new Watermark(7L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	/**
+	 * Verifies that we don't have leakage between different keys.
+	 */
+	@Test
+	public void testProcessingTimeTimerWithState() throws Exception {
+
+		ProcessOperator<Integer, Integer, String> operator =
+				new ProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(1);
+		testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
+
+		testHarness.setProcessingTime(2);
+		testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
+
+		testHarness.setProcessingTime(6);
+		testHarness.setProcessingTime(7);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("INPUT:17"));
+		expectedOutput.add(new StreamRecord<>("INPUT:42"));
+		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testSnapshotAndRestore() throws Exception {
+
+		ProcessOperator<Integer, Integer, String> operator =
+				new ProcessOperator<>(new BothTriggeringFlatMapFunction());
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(5, 12L));
+
+		// snapshot and restore from scratch
+		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+		testHarness.close();
+
+		operator = new ProcessOperator<>(new BothTriggeringFlatMapFunction());
+
+		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.open();
+
+		testHarness.setProcessingTime(5);
+		testHarness.processWatermark(new Watermark(6));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+		expectedOutput.add(new Watermark(6));
+
+		System.out.println("GOT: " + testHarness.getOutput());
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public T getKey(T value) throws Exception {
+			return value;
+		}
+	}
+
+	private static class QueryingFlatMapFunction implements ProcessFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final TimeDomain timeDomain;
+
+		public QueryingFlatMapFunction(TimeDomain timeDomain) {
+			this.timeDomain = timeDomain;
+		}
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+				out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+			} else {
+				out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+			}
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+		}
+	}
+
+	private static class TriggeringFlatMapFunction implements ProcessFunction<Integer, Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final TimeDomain timeDomain;
+
+		public TriggeringFlatMapFunction(TimeDomain timeDomain) {
+			this.timeDomain = timeDomain;
+		}
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+			out.collect(value);
+			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+			} else {
+				ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+			}
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<Integer> out) throws Exception {
+
+			assertEquals(this.timeDomain, ctx.timeDomain());
+			out.collect(1777);
+		}
+	}
+
+	private static class TriggeringStatefulFlatMapFunction extends RichProcessFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<Integer> state =
+				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE,  null);
+
+		private final TimeDomain timeDomain;
+
+		public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
+			this.timeDomain = timeDomain;
+		}
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT:" + value);
+			getRuntimeContext().getState(state).update(value);
+			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+			} else {
+				ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+			}
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+			assertEquals(this.timeDomain, ctx.timeDomain());
+			out.collect("STATE:" + getRuntimeContext().getState(state).value());
+		}
+	}
+
+	private static class BothTriggeringFlatMapFunction implements ProcessFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+			ctx.timerService().registerProcessingTimeTimer(5);
+			ctx.timerService().registerEventTimeTimer(6);
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+			if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
+				out.collect("EVENT:1777");
+			} else {
+				out.collect("PROC:1777");
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
deleted file mode 100644
index 6080ddc..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.operators;
-
-
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests {@link StreamTimelyFlatMap}.
- */
-public class TimelyFlatMapTest extends TestLogger {
-
-	@Test
-	public void testTimestampAndWatermarkQuerying() throws Exception {
-
-		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
-
-		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processWatermark(new Watermark(17));
-		testHarness.processElement(new StreamRecord<>(5, 12L));
-
-		testHarness.processWatermark(new Watermark(42));
-		testHarness.processElement(new StreamRecord<>(6, 13L));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new Watermark(17L));
-		expectedOutput.add(new StreamRecord<>("5TIME:17 TS:12", 12L));
-		expectedOutput.add(new Watermark(42L));
-		expectedOutput.add(new StreamRecord<>("6TIME:42 TS:13", 13L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testTimestampAndProcessingTimeQuerying() throws Exception {
-
-		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
-
-		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.setProcessingTime(17);
-		testHarness.processElement(new StreamRecord<>(5));
-
-		testHarness.setProcessingTime(42);
-		testHarness.processElement(new StreamRecord<>(6));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>("5TIME:17 TS:null"));
-		expectedOutput.add(new StreamRecord<>("6TIME:42 TS:null"));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testEventTimeTimers() throws Exception {
-
-		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
-				new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
-
-		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processWatermark(new Watermark(0));
-
-		testHarness.processElement(new StreamRecord<>(17, 42L));
-
-		testHarness.processWatermark(new Watermark(5));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new Watermark(0L));
-		expectedOutput.add(new StreamRecord<>(17, 42L));
-		expectedOutput.add(new StreamRecord<>(1777, 5L));
-		expectedOutput.add(new Watermark(5L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testProcessingTimeTimers() throws Exception {
-
-		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
-				new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
-
-		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<>(17));
-
-		testHarness.setProcessingTime(5);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>(17));
-		expectedOutput.add(new StreamRecord<>(1777, 5L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	/**
-	 * Verifies that we don't have leakage between different keys.
-	 */
-	@Test
-	public void testEventTimeTimerWithState() throws Exception {
-
-		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
-
-		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processWatermark(new Watermark(1));
-		testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
-
-		testHarness.processWatermark(new Watermark(2));
-		testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
-
-		testHarness.processWatermark(new Watermark(6));
-		testHarness.processWatermark(new Watermark(7));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new Watermark(1L));
-		expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
-		expectedOutput.add(new Watermark(2L));
-		expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
-		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
-		expectedOutput.add(new Watermark(6L));
-		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-		expectedOutput.add(new Watermark(7L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	/**
-	 * Verifies that we don't have leakage between different keys.
-	 */
-	@Test
-	public void testProcessingTimeTimerWithState() throws Exception {
-
-		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
-
-		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.setProcessingTime(1);
-		testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
-
-		testHarness.setProcessingTime(2);
-		testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
-
-		testHarness.setProcessingTime(6);
-		testHarness.setProcessingTime(7);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>("INPUT:17"));
-		expectedOutput.add(new StreamRecord<>("INPUT:42"));
-		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
-		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testSnapshotAndRestore() throws Exception {
-
-		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
-
-		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<>(5, 12L));
-
-		// snapshot and restore from scratch
-		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-
-		testHarness.close();
-
-		operator = new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
-
-		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.initializeState(snapshot);
-		testHarness.open();
-
-		testHarness.setProcessingTime(5);
-		testHarness.processWatermark(new Watermark(6));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
-		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
-		expectedOutput.add(new Watermark(6));
-
-		System.out.println("GOT: " + testHarness.getOutput());
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public T getKey(T value) throws Exception {
-			return value;
-		}
-	}
-
-	private static class QueryingFlatMapFunction implements TimelyFlatMapFunction<Integer, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final TimeDomain timeDomain;
-
-		public QueryingFlatMapFunction(TimeDomain timeDomain) {
-			this.timeDomain = timeDomain;
-		}
-
-		@Override
-		public void flatMap(Integer value, Context ctx, Collector<String> out) throws Exception {
-			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
-				out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
-			} else {
-				out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
-			}
-		}
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-		}
-	}
-
-	private static class TriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final TimeDomain timeDomain;
-
-		public TriggeringFlatMapFunction(TimeDomain timeDomain) {
-			this.timeDomain = timeDomain;
-		}
-
-		@Override
-		public void flatMap(Integer value, Context ctx, Collector<Integer> out) throws Exception {
-			out.collect(value);
-			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
-				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
-			} else {
-				ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
-			}
-		}
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<Integer> out) throws Exception {
-
-			assertEquals(this.timeDomain, ctx.timeDomain());
-			out.collect(1777);
-		}
-	}
-
-	private static class TriggeringStatefulFlatMapFunction extends RichTimelyFlatMapFunction<Integer, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final ValueStateDescriptor<Integer> state =
-				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE,  null);
-
-		private final TimeDomain timeDomain;
-
-		public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
-			this.timeDomain = timeDomain;
-		}
-
-		@Override
-		public void flatMap(Integer value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT:" + value);
-			getRuntimeContext().getState(state).update(value);
-			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
-				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
-			} else {
-				ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
-			}
-		}
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-			assertEquals(this.timeDomain, ctx.timeDomain());
-			out.collect("STATE:" + getRuntimeContext().getState(state).value());
-		}
-	}
-
-	private static class BothTriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Integer value, Context ctx, Collector<String> out) throws Exception {
-			ctx.timerService().registerProcessingTimeTimer(5);
-			ctx.timerService().registerEventTimeTimer(6);
-		}
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-			if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
-				out.collect("EVENT:1777");
-			} else {
-				out.collect("PROC:1777");
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
new file mode 100644
index 0000000..a449359
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators.co;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link CoProcessOperator}.
+ */
+public class CoProcessOperatorTest extends TestLogger {
+
+	@Test
+	public void testTimestampAndWatermarkQuerying() throws Exception {
+
+		CoProcessOperator<String, Integer, String, String> operator =
+				new CoProcessOperator<>(new WatermarkQueryingProcessFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark1(new Watermark(17));
+		testHarness.processWatermark2(new Watermark(17));
+		testHarness.processElement1(new StreamRecord<>(5, 12L));
+
+		testHarness.processWatermark1(new Watermark(42));
+		testHarness.processWatermark2(new Watermark(42));
+		testHarness.processElement2(new StreamRecord<>("6", 13L));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(17L));
+		expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L));
+		expectedOutput.add(new Watermark(42L));
+		expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testTimestampAndProcessingTimeQuerying() throws Exception {
+
+		CoProcessOperator<String, Integer, String, String> operator =
+				new CoProcessOperator<>(new ProcessingTimeQueryingProcessFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(17);
+		testHarness.processElement1(new StreamRecord<>(5));
+
+		testHarness.setProcessingTime(42);
+		testHarness.processElement2(new StreamRecord<>("6"));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("5PT:17 TS:null"));
+		expectedOutput.add(new StreamRecord<>("6PT:42 TS:null"));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testEventTimeTimers() throws Exception {
+
+		CoProcessOperator<String, Integer, String, String> operator =
+				new CoProcessOperator<>(new EventTimeTriggeringProcessFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement1(new StreamRecord<>(17, 42L));
+		testHarness.processElement2(new StreamRecord<>("18", 42L));
+
+		testHarness.processWatermark1(new Watermark(5));
+		testHarness.processWatermark2(new Watermark(5));
+
+		testHarness.processWatermark1(new Watermark(6));
+		testHarness.processWatermark2(new Watermark(6));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
+		expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
+		expectedOutput.add(new StreamRecord<>("1777", 5L));
+		expectedOutput.add(new Watermark(5L));
+		expectedOutput.add(new StreamRecord<>("1777", 6L));
+		expectedOutput.add(new Watermark(6L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testProcessingTimeTimers() throws Exception {
+
+		CoProcessOperator<String, Integer, String, String> operator =
+				new CoProcessOperator<>(new ProcessingTimeTriggeringProcessFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement1(new StreamRecord<>(17));
+		testHarness.processElement2(new StreamRecord<>("18"));
+
+		testHarness.setProcessingTime(5);
+		testHarness.setProcessingTime(6);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+		expectedOutput.add(new StreamRecord<>("INPUT2:18"));
+		expectedOutput.add(new StreamRecord<>("1777", 5L));
+		expectedOutput.add(new StreamRecord<>("1777", 6L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	/**
+	 * Verifies that we don't have leakage between different keys.
+	 */
+	@Test
+	public void testEventTimeTimerWithState() throws Exception {
+
+		CoProcessOperator<String, Integer, String, String> operator =
+				new CoProcessOperator<>(new EventTimeTriggeringStatefulProcessFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark1(new Watermark(1));
+		testHarness.processWatermark2(new Watermark(1));
+		testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6
+
+		testHarness.processWatermark1(new Watermark(2));
+		testHarness.processWatermark2(new Watermark(2));
+		testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7
+
+		testHarness.processWatermark1(new Watermark(6));
+		testHarness.processWatermark2(new Watermark(6));
+
+		testHarness.processWatermark1(new Watermark(7));
+		testHarness.processWatermark2(new Watermark(7));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(1L));
+		expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L));
+		expectedOutput.add(new Watermark(2L));
+		expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L));
+		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+		expectedOutput.add(new Watermark(6L));
+		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+		expectedOutput.add(new Watermark(7L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	/**
+	 * Verifies that we don't have leakage between different keys.
+	 */
+	@Test
+	public void testProcessingTimeTimerWithState() throws Exception {
+
+		CoProcessOperator<String, Integer, String, String> operator =
+				new CoProcessOperator<>(new ProcessingTimeTriggeringStatefulProcessFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(1);
+		testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6
+
+		testHarness.setProcessingTime(2);
+		testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7
+
+		testHarness.setProcessingTime(6);
+		testHarness.setProcessingTime(7);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+		expectedOutput.add(new StreamRecord<>("INPUT2:42"));
+		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testSnapshotAndRestore() throws Exception {
+
+		CoProcessOperator<String, Integer, String, String> operator =
+				new CoProcessOperator<>(new BothTriggeringProcessFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement1(new StreamRecord<>(5, 12L));
+		testHarness.processElement2(new StreamRecord<>("5", 12L));
+
+		// snapshot and restore from scratch
+		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+		testHarness.close();
+
+		operator = new CoProcessOperator<>(new BothTriggeringProcessFunction());
+
+		testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
+				operator,
+				new IntToStringKeySelector<>(),
+				new IdentityKeySelector<String>(),
+				BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.open();
+
+		testHarness.setProcessingTime(5);
+		testHarness.processWatermark1(new Watermark(6));
+		testHarness.processWatermark2(new Watermark(6));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+		expectedOutput.add(new Watermark(6));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+
+	private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Integer value) throws Exception {
+			return "" + value;
+		}
+	}
+
+	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public T getKey(T value) throws Exception {
+			return value;
+		}
+	}
+
+	private static class WatermarkQueryingProcessFunction implements CoProcessFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+		}
+
+		@Override
+		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+			out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+		}
+	}
+
+	private static class EventTimeTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT1:" + value);
+			ctx.timerService().registerEventTimeTimer(5);
+		}
+
+		@Override
+		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT2:" + value);
+			ctx.timerService().registerEventTimeTimer(6);
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+
+			assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
+			out.collect("" + 1777);
+		}
+	}
+
+	private static class EventTimeTriggeringStatefulProcessFunction extends RichCoProcessFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<String> state =
+				new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null);
+
+		@Override
+		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT1:" + value);
+			getRuntimeContext().getState(state).update("" + value);
+			ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+		}
+
+		@Override
+		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT2:" + value);
+			getRuntimeContext().getState(state).update(value);
+			ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+		}
+
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+			assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
+			out.collect("STATE:" + getRuntimeContext().getState(state).value());
+		}
+	}
+
+	private static class ProcessingTimeTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT1:" + value);
+			ctx.timerService().registerProcessingTimeTimer(5);
+		}
+
+		@Override
+		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT2:" + value);
+			ctx.timerService().registerProcessingTimeTimer(6);
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+
+			assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
+			out.collect("" + 1777);
+		}
+	}
+
+	private static class ProcessingTimeQueryingProcessFunction implements CoProcessFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+		}
+
+		@Override
+		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+			out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+		}
+	}
+
+	private static class ProcessingTimeTriggeringStatefulProcessFunction extends RichCoProcessFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<String> state =
+				new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null);
+
+		@Override
+		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT1:" + value);
+			getRuntimeContext().getState(state).update("" + value);
+			ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+		}
+
+		@Override
+		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT2:" + value);
+			getRuntimeContext().getState(state).update(value);
+			ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+		}
+
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+			assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
+			out.collect("STATE:" + getRuntimeContext().getState(state).value());
+		}
+	}
+
+	private static class BothTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+			ctx.timerService().registerEventTimeTimer(6);
+		}
+
+		@Override
+		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+			ctx.timerService().registerProcessingTimeTimer(5);
+		}
+
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+			if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
+				out.collect("EVENT:1777");
+			} else {
+				out.collect("PROC:1777");
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
deleted file mode 100644
index 7c29631..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
+++ /dev/null
@@ -1,536 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.operators.co;
-
-
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests {@link CoStreamTimelyFlatMap}.
- */
-public class TimelyCoFlatMapTest extends TestLogger {
-
-	@Test
-	public void testTimestampAndWatermarkQuerying() throws Exception {
-
-		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(new WatermarkQueryingFlatMapFunction());
-
-		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
-				new KeyedTwoInputStreamOperatorTestHarness<>(
-						operator,
-						new IntToStringKeySelector<>(),
-						new IdentityKeySelector<String>(),
-						BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processWatermark1(new Watermark(17));
-		testHarness.processWatermark2(new Watermark(17));
-		testHarness.processElement1(new StreamRecord<>(5, 12L));
-
-		testHarness.processWatermark1(new Watermark(42));
-		testHarness.processWatermark2(new Watermark(42));
-		testHarness.processElement2(new StreamRecord<>("6", 13L));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new Watermark(17L));
-		expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L));
-		expectedOutput.add(new Watermark(42L));
-		expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testTimestampAndProcessingTimeQuerying() throws Exception {
-
-		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(new ProcessingTimeQueryingFlatMapFunction());
-
-		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
-				new KeyedTwoInputStreamOperatorTestHarness<>(
-						operator,
-						new IntToStringKeySelector<>(),
-						new IdentityKeySelector<String>(),
-						BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.setProcessingTime(17);
-		testHarness.processElement1(new StreamRecord<>(5));
-
-		testHarness.setProcessingTime(42);
-		testHarness.processElement2(new StreamRecord<>("6"));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>("5PT:17 TS:null"));
-		expectedOutput.add(new StreamRecord<>("6PT:42 TS:null"));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testEventTimeTimers() throws Exception {
-
-		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(new EventTimeTriggeringFlatMapFunction());
-
-		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
-				new KeyedTwoInputStreamOperatorTestHarness<>(
-						operator,
-						new IntToStringKeySelector<>(),
-						new IdentityKeySelector<String>(),
-						BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processElement1(new StreamRecord<>(17, 42L));
-		testHarness.processElement2(new StreamRecord<>("18", 42L));
-
-		testHarness.processWatermark1(new Watermark(5));
-		testHarness.processWatermark2(new Watermark(5));
-
-		testHarness.processWatermark1(new Watermark(6));
-		testHarness.processWatermark2(new Watermark(6));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
-		expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
-		expectedOutput.add(new StreamRecord<>("1777", 5L));
-		expectedOutput.add(new Watermark(5L));
-		expectedOutput.add(new StreamRecord<>("1777", 6L));
-		expectedOutput.add(new Watermark(6L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testProcessingTimeTimers() throws Exception {
-
-		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringFlatMapFunction());
-
-		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
-				new KeyedTwoInputStreamOperatorTestHarness<>(
-						operator,
-						new IntToStringKeySelector<>(),
-						new IdentityKeySelector<String>(),
-						BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processElement1(new StreamRecord<>(17));
-		testHarness.processElement2(new StreamRecord<>("18"));
-
-		testHarness.setProcessingTime(5);
-		testHarness.setProcessingTime(6);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>("INPUT1:17"));
-		expectedOutput.add(new StreamRecord<>("INPUT2:18"));
-		expectedOutput.add(new StreamRecord<>("1777", 5L));
-		expectedOutput.add(new StreamRecord<>("1777", 6L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	/**
-	 * Verifies that we don't have leakage between different keys.
-	 */
-	@Test
-	public void testEventTimeTimerWithState() throws Exception {
-
-		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(new EventTimeTriggeringStatefulFlatMapFunction());
-
-		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
-				new KeyedTwoInputStreamOperatorTestHarness<>(
-						operator,
-						new IntToStringKeySelector<>(),
-						new IdentityKeySelector<String>(),
-						BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processWatermark1(new Watermark(1));
-		testHarness.processWatermark2(new Watermark(1));
-		testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6
-
-		testHarness.processWatermark1(new Watermark(2));
-		testHarness.processWatermark2(new Watermark(2));
-		testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7
-
-		testHarness.processWatermark1(new Watermark(6));
-		testHarness.processWatermark2(new Watermark(6));
-
-		testHarness.processWatermark1(new Watermark(7));
-		testHarness.processWatermark2(new Watermark(7));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new Watermark(1L));
-		expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L));
-		expectedOutput.add(new Watermark(2L));
-		expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L));
-		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
-		expectedOutput.add(new Watermark(6L));
-		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-		expectedOutput.add(new Watermark(7L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	/**
-	 * Verifies that we don't have leakage between different keys.
-	 */
-	@Test
-	public void testProcessingTimeTimerWithState() throws Exception {
-
-		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringStatefulFlatMapFunction());
-
-		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
-				new KeyedTwoInputStreamOperatorTestHarness<>(
-						operator,
-						new IntToStringKeySelector<>(),
-						new IdentityKeySelector<String>(),
-						BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.setProcessingTime(1);
-		testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6
-
-		testHarness.setProcessingTime(2);
-		testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7
-
-		testHarness.setProcessingTime(6);
-		testHarness.setProcessingTime(7);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>("INPUT1:17"));
-		expectedOutput.add(new StreamRecord<>("INPUT2:42"));
-		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
-		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testSnapshotAndRestore() throws Exception {
-
-		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
-
-		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
-				new KeyedTwoInputStreamOperatorTestHarness<>(
-						operator,
-						new IntToStringKeySelector<>(),
-						new IdentityKeySelector<String>(),
-						BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processElement1(new StreamRecord<>(5, 12L));
-		testHarness.processElement2(new StreamRecord<>("5", 12L));
-
-		// snapshot and restore from scratch
-		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-
-		testHarness.close();
-
-		operator = new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
-
-		testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
-				operator,
-				new IntToStringKeySelector<>(),
-				new IdentityKeySelector<String>(),
-				BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.initializeState(snapshot);
-		testHarness.open();
-
-		testHarness.setProcessingTime(5);
-		testHarness.processWatermark1(new Watermark(6));
-		testHarness.processWatermark2(new Watermark(6));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
-		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
-		expectedOutput.add(new Watermark(6));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-
-	private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Integer value) throws Exception {
-			return "" + value;
-		}
-	}
-
-	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public T getKey(T value) throws Exception {
-			return value;
-		}
-	}
-
-	private static class WatermarkQueryingFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception {
-			out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
-		}
-
-		@Override
-		public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception {
-			out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
-		}
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-		}
-	}
-
-	private static class EventTimeTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT1:" + value);
-			ctx.timerService().registerEventTimeTimer(5);
-		}
-
-		@Override
-		public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT2:" + value);
-			ctx.timerService().registerEventTimeTimer(6);
-		}
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-
-			assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
-			out.collect("" + 1777);
-		}
-	}
-
-	private static class EventTimeTriggeringStatefulFlatMapFunction extends RichTimelyCoFlatMapFunction<Integer, String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final ValueStateDescriptor<String> state =
-				new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null);
-
-		@Override
-		public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT1:" + value);
-			getRuntimeContext().getState(state).update("" + value);
-			ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
-		}
-
-		@Override
-		public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT2:" + value);
-			getRuntimeContext().getState(state).update(value);
-			ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
-		}
-
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-			assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
-			out.collect("STATE:" + getRuntimeContext().getState(state).value());
-		}
-	}
-
-	private static class ProcessingTimeTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT1:" + value);
-			ctx.timerService().registerProcessingTimeTimer(5);
-		}
-
-		@Override
-		public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT2:" + value);
-			ctx.timerService().registerProcessingTimeTimer(6);
-		}
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-
-			assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
-			out.collect("" + 1777);
-		}
-	}
-
-	private static class ProcessingTimeQueryingFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception {
-			out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
-		}
-
-		@Override
-		public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception {
-			out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
-		}
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-		}
-	}
-
-	private static class ProcessingTimeTriggeringStatefulFlatMapFunction extends RichTimelyCoFlatMapFunction<Integer, String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final ValueStateDescriptor<String> state =
-				new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null);
-
-		@Override
-		public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT1:" + value);
-			getRuntimeContext().getState(state).update("" + value);
-			ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
-		}
-
-		@Override
-		public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT2:" + value);
-			getRuntimeContext().getState(state).update(value);
-			ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
-		}
-
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-			assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
-			out.collect("STATE:" + getRuntimeContext().getState(state).value());
-		}
-	}
-
-	private static class BothTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception {
-			ctx.timerService().registerEventTimeTimer(6);
-		}
-
-		@Override
-		public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception {
-			ctx.timerService().registerProcessingTimeTimer(5);
-		}
-
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-			if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
-				out.collect("EVENT:1777");
-			} else {
-				out.collect("PROC:1777");
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 50526b5..a7325a4 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, TimelyCoFlatMapFunction}
+import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoProcessFunction, RichCoProcessFunction}
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
 import org.apache.flink.util.Collector
 
@@ -101,30 +101,33 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
   }
 
   /**
-   * Applies the given [[TimelyCoFlatMapFunction]] on the connected input streams,
+   * Applies the given [[CoProcessFunction]] on the connected input streams,
    * thereby creating a transformed output stream.
    *
-   * The function will be called for every element in the streams and can produce
-   * zero or more output. The function can also query the time and set timers. When
-   * reacting to the firing of set timers the function can emit yet more elements.
+   * The function will be called for every element in the input streams and can produce zero
+   * or more output elements. Contrary to the [[flatMap(CoFlatMapFunction)]] function,
+   * this function can also query the time and set timers. When reacting to the firing of set
+   * timers the function can directly emit elements and/or register yet more timers.
    *
-   * A [[org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction]]
+   * A [[RichCoProcessFunction]]
    * can be used to gain access to features provided by the
    * [[org.apache.flink.api.common.functions.RichFunction]] interface.
    *
-   * @param coFlatMapper The [[TimelyCoFlatMapFunction]] that is called for each element
-    *                     in the stream.
-    *
-   * @return The transformed { @link DataStream}.
+   * @param coProcessFunction The [[CoProcessFunction]] that is called for each element
+    *                    in the stream.
+   * @return The transformed [[DataStream]].
    */
-  def flatMap[R: TypeInformation](
-      coFlatMapper: TimelyCoFlatMapFunction[IN1, IN2, R]) : DataStream[R] = {
+  @PublicEvolving
+  def process[R: TypeInformation](
+      coProcessFunction: CoProcessFunction[IN1, IN2, R]) : DataStream[R] = {
 
-    if (coFlatMapper == null) throw new NullPointerException("FlatMap function must not be null.")
+    if (coProcessFunction == null) {
+      throw new NullPointerException("CoProcessFunction function must not be null.")
+    }
 
     val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
 
-    asScalaStream(javaStream.flatMap(coFlatMapper, outType))
+    asScalaStream(javaStream.process(coProcessFunction, outType))
   }
 
 
@@ -144,14 +147,14 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * @return
     *        The resulting data stream.
    */
-  def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): 
+  def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]):
           DataStream[R] = {
-    
+
     if (coFlatMapper == null) {
       throw new NullPointerException("FlatMap function must not be null.")
     }
-    
-    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
+
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
     asScalaStream(javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]])
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 66d80c2..f2999b3 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescr
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.streaming.api.datastream.{QueryableStateStream, SingleOutputStreamOperator, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
+import org.apache.flink.streaming.api.functions.{ProcessFunction, RichProcessFunction}
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
 import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator}
@@ -54,28 +54,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
   // ------------------------------------------------------------------------
 
   /**
-    * Applies the given [[TimelyFlatMapFunction]] on the input stream, thereby
+    * Applies the given [[ProcessFunction]] on the input stream, thereby
     * creating a transformed output stream.
     *
     * The function will be called for every element in the stream and can produce
     * zero or more output. The function can also query the time and set timers. When
     * reacting to the firing of set timers the function can emit yet more elements.
     *
-    * A [[org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction]]
+    * The function will be called for every element in the input streams and can produce zero
+    * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]]
+    * function, this function can also query the time and set timers. When reacting to the firing
+    * of set timers the function can directly emit elements and/or register yet more timers.
+    *
+    * A [[RichProcessFunction]]
     * can be used to gain access to features provided by the
     * [[org.apache.flink.api.common.functions.RichFunction]]
     *
-    * @param flatMapper The [[TimelyFlatMapFunction]] that is called for each element
+    * @param processFunction The [[ProcessFunction]] that is called for each element
     *                   in the stream.
     */
-  def flatMap[R: TypeInformation](
-      flatMapper: TimelyFlatMapFunction[T, R]): DataStream[R] = {
+  @PublicEvolving
+  def process[R: TypeInformation](
+    processFunction: ProcessFunction[T, R]): DataStream[R] = {
 
-    if (flatMapper == null) {
-      throw new NullPointerException("TimelyFlatMapFunction must not be null.")
+    if (processFunction == null) {
+      throw new NullPointerException("ProcessFunction must not be null.")
     }
 
-    asScalaStream(javaStream.flatMap(flatMapper, implicitly[TypeInformation[R]]))
+    asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]]))
   }
   
   // ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 967142b..adb59f2 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -23,11 +23,11 @@ import java.lang
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction.{Context, OnTimerContext}
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
 import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
-import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator, StreamTimelyFlatMap}
+import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, ProcessOperator, StreamOperator}
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
 import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
@@ -318,26 +318,26 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
   }
 
   /**
-   * Verify that a timely flat map call is correctly translated to an operator.
+   * Verify that a [[KeyedStream.process()]] call is correctly translated to an operator.
    */
   @Test
-  def testTimelyFlatMapTranslation(): Unit = {
+  def testProcessTranslation(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     val src = env.generateSequence(0, 0)
 
-    val timelyFlatMapFunction = new TimelyFlatMapFunction[Long, Int] {
-      override def flatMap(value: Long, ctx: Context, out: Collector[Int]): Unit = ???
+    val processFunction = new ProcessFunction[Long, Int] {
+      override def processElement(value: Long, ctx: Context, out: Collector[Int]): Unit = ???
       override def onTimer(
           timestamp: Long,
           ctx: OnTimerContext,
           out: Collector[Int]): Unit = ???
     }
 
-    val flatMapped = src.keyBy(x => x).flatMap(timelyFlatMapFunction)
+    val flatMapped = src.keyBy(x => x).process(processFunction)
 
-    assert(timelyFlatMapFunction == getFunctionForDataStream(flatMapped))
-    assert(getOperatorForDataStream(flatMapped).isInstanceOf[StreamTimelyFlatMap[_, _, _]])
+    assert(processFunction == getFunctionForDataStream(flatMapped))
+    assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _, _]])
   }
 
   @Test def operatorTest() {


[2/2] flink git commit: [FLINK-5026] Rename TimelyFlatMap to Process

Posted by al...@apache.org.
[FLINK-5026] Rename TimelyFlatMap to Process


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/910f733f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/910f733f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/910f733f

Branch: refs/heads/master
Commit: 910f733f5ec52d2dd1e9dcc4ec6a4844cae2f2b4
Parents: 3a27f55
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Nov 11 10:57:25 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 28 18:38:29 2016 +0100

----------------------------------------------------------------------
 .../api/datastream/ConnectedStreams.java        |  62 ++-
 .../streaming/api/datastream/KeyedStream.java   |  55 +-
 .../api/functions/ProcessFunction.java          | 109 ++++
 .../api/functions/RichProcessFunction.java      |  40 ++
 .../functions/RichTimelyFlatMapFunction.java    |  40 --
 .../api/functions/TimelyFlatMapFunction.java    | 110 ----
 .../api/functions/co/CoProcessFunction.java     | 130 +++++
 .../api/functions/co/RichCoProcessFunction.java |  41 ++
 .../co/RichTimelyCoFlatMapFunction.java         |  41 --
 .../functions/co/TimelyCoFlatMapFunction.java   | 128 -----
 .../api/operators/ProcessOperator.java          | 151 ++++++
 .../api/operators/StreamTimelyFlatMap.java      | 151 ------
 .../api/operators/co/CoProcessOperator.java     | 167 ++++++
 .../api/operators/co/CoStreamTimelyFlatMap.java | 167 ------
 .../flink/streaming/api/DataStreamTest.java     |  23 +-
 .../api/operators/ProcessOperatorTest.java      | 404 ++++++++++++++
 .../api/operators/TimelyFlatMapTest.java        | 404 --------------
 .../api/operators/co/CoProcessOperatorTest.java | 536 +++++++++++++++++++
 .../api/operators/co/TimelyCoFlatMapTest.java   | 536 -------------------
 .../streaming/api/scala/ConnectedStreams.scala  |  39 +-
 .../flink/streaming/api/scala/KeyedStream.scala |  24 +-
 .../streaming/api/scala/DataStreamTest.scala    |  20 +-
 22 files changed, 1701 insertions(+), 1677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index dc763cb..96a08d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -27,11 +27,12 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-import org.apache.flink.streaming.api.operators.co.CoStreamTimelyFlatMap;
+import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 
 import static java.util.Objects.requireNonNull;
@@ -234,64 +235,71 @@ public class ConnectedStreams<IN1, IN2> {
 	}
 
 	/**
-	 * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams,
+	 * Applies the given {@link CoProcessFunction} on the connected input streams,
 	 * thereby creating a transformed output stream.
 	 *
-	 * <p>The function will be called for every element in the streams and can produce
-	 * zero or more output. The function can also query the time and set timers. When
-	 * reacting to the firing of set timers the function can emit yet more elements.
+	 * <p>The function will be called for every element in the input streams and can produce zero or
+	 * more output elements. Contrary to the {@link #flatMap(CoFlatMapFunction)} function, this
+	 * function can also query the time and set timers. When reacting to the firing of set timers
+	 * the function can directly emit elements and/or register yet more timers.
 	 *
-	 * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+	 * <p>A {@link RichCoProcessFunction}
 	 * can be used to gain access to features provided by the
 	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
 	 *
-	 * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element
+	 * @param coProcessFunction The {@link CoProcessFunction} that is called for each element
 	 *                      in the stream.
 	 *
-	 * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}.
+	 * @param <R> The type of elements emitted by the {@code CoProcessFunction}.
 	 *
 	 * @return The transformed {@link DataStream}.
 	 */
-	public <R> SingleOutputStreamOperator<R> flatMap(
-			TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper) {
+	@PublicEvolving
+	public <R> SingleOutputStreamOperator<R> process(
+			CoProcessFunction<IN1, IN2, R> coProcessFunction) {
 
-		TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
-				TimelyCoFlatMapFunction.class, false, true, getType1(), getType2(),
+		TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coProcessFunction,
+				CoProcessFunction.class, false, true, getType1(), getType2(),
 				Utils.getCallLocationName(), true);
 
-		return flatMap(coFlatMapper, outTypeInfo);
+		return process(coProcessFunction, outTypeInfo);
 	}
 
 	/**
-	 * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams,
+	 * Applies the given {@link CoProcessFunction} on the connected input streams,
 	 * thereby creating a transformed output stream.
 	 *
-	 * <p>The function will be called for every element in the streams and can produce
-	 * zero or more output. The function can also query the time and set timers. When
-	 * reacting to the firing of set timers the function can emit yet more elements.
+	 * <p>The function will be called for every element in the input streams and can produce zero
+	 * or more output elements. Contrary to the {@link #flatMap(CoFlatMapFunction)} function,
+	 * this function can also query the time and set timers. When reacting to the firing of set
+	 * timers the function can directly emit elements and/or register yet more timers.
 	 *
-	 * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+	 * <p>A {@link RichCoProcessFunction}
 	 * can be used to gain access to features provided by the
 	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
 	 *
-	 * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element
+	 * @param coProcessFunction The {@link CoProcessFunction} that is called for each element
 	 *                      in the stream.
 	 *
-	 * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}.
+	 * @param <R> The type of elements emitted by the {@code CoProcessFunction}.
 	 *
 	 * @return The transformed {@link DataStream}.
 	 */
 	@Internal
-	public <R> SingleOutputStreamOperator<R> flatMap(
-			TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper,
+	public <R> SingleOutputStreamOperator<R> process(
+			CoProcessFunction<IN1, IN2, R> coProcessFunction,
 			TypeInformation<R> outputType) {
 
-		CoStreamTimelyFlatMap<Object, IN1, IN2, R> operator = new CoStreamTimelyFlatMap<>(
-				inputStream1.clean(coFlatMapper));
+		if (!(inputStream1 instanceof KeyedStream) || !(inputStream2 instanceof KeyedStream)) {
+			throw new UnsupportedOperationException("A CoProcessFunction can only be applied" +
+					"when both input streams are keyed.");
+		}
 
-		return transform("Co-Flat Map", outputType, operator);
-	}
+		CoProcessOperator<Object, IN1, IN2, R> operator = new CoProcessOperator<>(
+				inputStream1.clean(coProcessFunction));
 
+		return transform("Co-Process", outputType, operator);
+	}
 
 	@PublicEvolving
 	public <R> SingleOutputStreamOperator<R> transform(String functionName,

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 5b00bcd..560ecab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -32,7 +33,8 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.RichProcessFunction;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
@@ -42,7 +44,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
@@ -173,67 +175,70 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby
+	 * Applies the given {@link ProcessFunction} on the input stream, thereby
 	 * creating a transformed output stream.
 	 *
-	 * <p>The function will be called for every element in the stream and can produce
-	 * zero or more output. The function can also query the time and set timers. When
-	 * reacting to the firing of set timers the function can emit yet more elements.
+	 * <p>The function will be called for every element in the input streams and can produce zero
+	 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
+	 * function, this function can also query the time and set timers. When reacting to the firing
+	 * of set timers the function can directly emit elements and/or register yet more timers.
 	 *
-	 * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+	 * <p>A {@link RichProcessFunction}
 	 * can be used to gain access to features provided by the
 	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
 	 *
-	 * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element
+	 * @param processFunction The {@link ProcessFunction} that is called for each element
 	 *                      in the stream.
 	 *
-	 * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}.
+	 * @param <R> The type of elements emitted by the {@code ProcessFunction}.
 	 *
 	 * @return The transformed {@link DataStream}.
 	 */
-	public <R> SingleOutputStreamOperator<R> flatMap(TimelyFlatMapFunction<T, R> flatMapper) {
+	@PublicEvolving
+	public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
 
 		TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
-				flatMapper,
-				TimelyFlatMapFunction.class,
+				processFunction,
+				ProcessFunction.class,
 				false,
 				true,
 				getType(),
 				Utils.getCallLocationName(),
 				true);
 
-		return flatMap(flatMapper, outType);
+		return process(processFunction, outType);
 	}
 
 	/**
-	 * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby
+	 * Applies the given {@link ProcessFunction} on the input stream, thereby
 	 * creating a transformed output stream.
 	 *
-	 * <p>The function will be called for every element in the stream and can produce
-	 * zero or more output. The function can also query the time and set timers. When
-	 * reacting to the firing of set timers the function can emit yet more elements.
+	 * <p>The function will be called for every element in the input streams and can produce zero
+	 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
+	 * function, this function can also query the time and set timers. When reacting to the firing
+	 * of set timers the function can directly emit elements and/or register yet more timers.
 	 *
-	 * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+	 * <p>A {@link RichProcessFunction}
 	 * can be used to gain access to features provided by the
 	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
 	 *
-	 * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element
+	 * @param processFunction The {@link ProcessFunction} that is called for each element
 	 *                      in the stream.
 	 * @param outputType {@link TypeInformation} for the result type of the function.
 	 *
-	 * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}.
+	 * @param <R> The type of elements emitted by the {@code ProcessFunction}.
 	 *
 	 * @return The transformed {@link DataStream}.
 	 */
 	@Internal
-	public <R> SingleOutputStreamOperator<R> flatMap(
-			TimelyFlatMapFunction<T, R> flatMapper,
+	public <R> SingleOutputStreamOperator<R> process(
+			ProcessFunction<T, R> processFunction,
 			TypeInformation<R> outputType) {
 
-		StreamTimelyFlatMap<KEY, T, R> operator =
-				new StreamTimelyFlatMap<>(clean(flatMapper));
+		ProcessOperator<KEY, T, R> operator =
+				new ProcessOperator<>(clean(processFunction));
 
-		return transform("Flat Map", outputType, operator);
+		return transform("Process", outputType, operator);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
new file mode 100644
index 0000000..fd0a829
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+/**
+ * A function that processes elements of a stream.
+ *
+ * <p>The function will be called for every element in the input stream and can produce
+ * zero or more output. The function can also query the time and set timers. When
+ * reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * <p>The function will be called for every element in the input stream and can produce
+ * zero or more output elements. Contrary to the
+ * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query
+ * the time (both event and processing) and set timers, through the provided {@link Context}.
+ * When reacting to the firing of set timers the function can directly emit a result, and/or
+ * register a timer that will trigger an action in the future.
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the output elements.
+ */
+@PublicEvolving
+public interface ProcessFunction<I, O> extends Function {
+
+	/**
+	 * Process one element from the input stream.
+	 *
+	 * <p>This function can output zero or more elements using the {@link Collector} parameter
+	 * and also update internal state or set timers using the {@link Context} parameter.
+	 *
+	 * @param value The input value.
+	 * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
+	 *            a {@link TimerService} for registering timers and querying the time. The
+	 *            context is only valid during the invocation of this method, do not store it.
+	 * @param out The collector for returning result values.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	void processElement(I value, Context ctx, Collector<O> out) throws Exception;
+
+	/**
+	 * Called when a timer set using {@link TimerService} fires.
+	 *
+	 * @param timestamp The timestamp of the firing timer.
+	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
+	 *            querying the {@link TimeDomain} of the firing timer and getting a
+	 *            {@link TimerService} for registering timers and querying the time.
+	 *            The context is only valid during the invocation of this method, do not store it.
+	 * @param out The collector for returning result values.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception ;
+
+	/**
+	 * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
+	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
+	 */
+	interface Context {
+
+		/**
+		 * Timestamp of the element currently being processed or timestamp of a firing timer.
+		 *
+		 * <p>This might be {@code null}, for example if the time characteristic of your program
+		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+		 */
+		Long timestamp();
+
+		/**
+		 * A {@link TimerService} for querying time and registering timers.
+		 */
+		TimerService timerService();
+	}
+
+	/**
+	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
+	 */
+	interface OnTimerContext extends Context {
+		/**
+		 * The {@link TimeDomain} of the firing timer.
+		 */
+		TimeDomain timeDomain();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
new file mode 100644
index 0000000..834f717
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+/**
+ * Rich variant of the {@link ProcessFunction}. As a
+ * {@link org.apache.flink.api.common.functions.RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+@PublicEvolving
+public abstract class RichProcessFunction<I, O>
+		extends AbstractRichFunction
+		implements ProcessFunction<I, O> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
deleted file mode 100644
index 0d86da9..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Rich variant of the {@link TimelyFlatMapFunction}. As a
- * {@link org.apache.flink.api.common.functions.RichFunction}, it gives access to the
- * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
- * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
- * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
- *
- * @param <I> Type of the input elements.
- * @param <O> Type of the returned elements.
- */
-@PublicEvolving
-public abstract class RichTimelyFlatMapFunction<I, O>
-		extends AbstractRichFunction
-		implements TimelyFlatMapFunction<I, O> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
deleted file mode 100644
index 5f039c4..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * Base interface for timely flatMap functions. FlatMap functions take elements and transform them,
- * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
- * and arrays.
- *
- * <p>A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal
- * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react
- * to them firing.
- *
- * <pre>{@code
- * DataStream<X> input = ...;
- *
- * DataStream<Y> result = input.flatMap(new MyTimelyFlatMapFunction());
- * }</pre>
- *
- * @param <I> Type of the input elements.
- * @param <O> Type of the returned elements.
- */
-@PublicEvolving
-public interface TimelyFlatMapFunction<I, O> extends Function, Serializable {
-
-	/**
-	 * The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms
-	 * it into zero, one, or more elements.
-	 *
-	 * @param value The input value.
-	 * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
-	 *            a {@link TimerService} for registering timers and querying the time. The
-	 *            context is only valid during the invocation of this method, do not store it.
-	 * @param out The collector for returning result values.
-	 *
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
-	void flatMap(I value, Context ctx, Collector<O> out) throws Exception;
-
-	/**
-	 * Called when a timer set using {@link TimerService} fires.
-	 *
-	 * @param timestamp The timestamp of the firing timer.
-	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
-	 *            querying the {@link TimeDomain} of the firing timer and getting a
-	 *            {@link TimerService} for registering timers and querying the time.
-	 *            The context is only valid during the invocation of this method, do not store it.
-	 * @param out The collector for returning result values.
-	 *
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
-	void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception ;
-
-	/**
-	 * Information available in an invocation of {@link #flatMap(Object, Context, Collector)}
-	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
-	 */
-	interface Context {
-
-		/**
-		 * Timestamp of the element currently being processed or timestamp of a firing timer.
-		 *
-		 * <p>This might be {@code null}, for example if the time characteristic of your program
-		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
-		 */
-		Long timestamp();
-
-		/**
-		 * A {@link TimerService} for querying time and registering timers.
-		 */
-		TimerService timerService();
-	}
-
-	/**
-	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
-	 */
-	interface OnTimerContext extends Context {
-		/**
-		 * The {@link TimeDomain} of the firing timer.
-		 */
-		TimeDomain timeDomain();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
new file mode 100644
index 0000000..feff8fb
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * A function that processes elements of two streams and produces a single output one.
+ *
+ * <p>The function will be called for every element in the input streams and can produce
+ * zero or more output elements. Contrary to the {@link CoFlatMapFunction}, this function can also
+ * query the time (both event and processing) and set timers, through the provided {@link Context}.
+ * When reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * <p>An example use-case for connected streams would be the application of a set of rules that change
+ * over time ({@code stream A}) to the elements contained in another stream (stream {@code B}). The rules
+ * contained in {@code stream A} can be stored in the state and wait for new elements to arrive on
+ * {@code stream B}. Upon reception of a new element on {@code stream B}, the function can now apply the
+ * previously stored rules to the element and directly emit a result, and/or register a timer that
+ * will trigger an action in the future.
+ *
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Output type.
+ */
+@PublicEvolving
+public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+	/**
+	 * This method is called for each element in the first of the connected streams.
+	 *
+	 * This function can output zero or more elements using the {@link Collector} parameter
+	 * and also update internal state or set timers using the {@link Context} parameter.
+	 * 
+	 * @param value The stream element
+	 * @param ctx A {@link Context} that allows querying the timestamp of the element,
+	 *            querying the {@link TimeDomain} of the firing timer and getting a
+	 *            {@link TimerService} for registering timers and querying the time.
+	 *            The context is only valid during the invocation of this method, do not store it.
+	 * @param out The collector to emit resulting elements to
+	 * @throws Exception The function may throw exceptions which cause the streaming program
+	 *                   to fail and go into recovery.
+	 */
+	void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
+
+	/**
+	 * This method is called for each element in the second of the connected streams.
+	 *
+	 * This function can output zero or more elements using the {@link Collector} parameter
+	 * and also update internal state or set timers using the {@link Context} parameter.
+	 * 
+	 * @param value The stream element
+	 * @param ctx A {@link Context} that allows querying the timestamp of the element,
+	 *            querying the {@link TimeDomain} of the firing timer and getting a
+	 *            {@link TimerService} for registering timers and querying the time.
+	 *            The context is only valid during the invocation of this method, do not store it.
+	 * @param out The collector to emit resulting elements to
+	 * @throws Exception The function may throw exceptions which cause the streaming program
+	 *                   to fail and go into recovery.
+	 */
+	void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
+
+	/**
+	 * Called when a timer set using {@link TimerService} fires.
+	 *
+	 * @param timestamp The timestamp of the firing timer.
+	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
+	 *            querying the {@link TimeDomain} of the firing timer and getting a
+	 *            {@link TimerService} for registering timers and querying the time.
+	 *            The context is only valid during the invocation of this method, do not store it.
+	 * @param out The collector for returning result values.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception ;
+
+	/**
+	 * Information available in an invocation of {@link #processElement1(Object, Context, Collector)}/
+	 * {@link #processElement2(Object, Context, Collector)}
+	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
+	 */
+	interface Context {
+
+		/**
+		 * Timestamp of the element currently being processed or timestamp of a firing timer.
+		 *
+		 * <p>This might be {@code null}, for example if the time characteristic of your program
+		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+		 */
+		Long timestamp();
+
+		/**
+		 * A {@link TimerService} for querying time and registering timers.
+		 */
+		TimerService timerService();
+	}
+
+	/**
+	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
+	 */
+	interface OnTimerContext extends Context {
+		/**
+		 * The {@link TimeDomain} of the firing timer.
+		 */
+		TimeDomain timeDomain();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
new file mode 100644
index 0000000..0fcea91
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link CoProcessFunction}. As a {@link RichFunction}, it gives
+ * access to the {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
+ * setup and teardown methods: {@link RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link RichFunction#close()}.
+ *
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Type of the returned elements.
+ */
+@PublicEvolving
+public abstract class RichCoProcessFunction<IN1, IN2, OUT>
+		extends AbstractRichFunction
+		implements CoProcessFunction<IN1, IN2, OUT> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
deleted file mode 100644
index 12fe181..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * Rich variant of the {@link TimelyCoFlatMapFunction}. As a {@link RichFunction}, it gives
- * access to the {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
- * setup and teardown methods: {@link RichFunction#open(org.apache.flink.configuration.Configuration)}
- * and {@link RichFunction#close()}.
- *
- * @param <IN1> Type of the first input.
- * @param <IN2> Type of the second input.
- * @param <OUT> Type of the returned elements.
- */
-@PublicEvolving
-public abstract class RichTimelyCoFlatMapFunction<IN1, IN2, OUT>
-		extends AbstractRichFunction
-		implements TimelyCoFlatMapFunction<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
deleted file mode 100644
index 89c7d79..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * A {@code TimelyCoFlatMapFunction} implements a flat-map transformation over two
- * connected streams.
- * 
- * <p>The same instance of the transformation function is used to transform
- * both of the connected streams. That way, the stream transformations can
- * share state.
- *
- * <p>A {@code TimelyCoFlatMapFunction} can, in addition to the functionality of a normal
- * {@link CoFlatMapFunction}, also set timers and react to them firing.
- * 
- * <p>An example for the use of connected streams would be to apply rules that change over time
- * onto elements of a stream. One of the connected streams has the rules, the other stream the
- * elements to apply the rules to. The operation on the connected stream maintains the 
- * current set of rules in the state. It may receive either a rule update (from the first stream)
- * and update the state, or a data element (from the second stream) and apply the rules in the
- * state to the element. The result of applying the rules would be emitted.
- *
- * @param <IN1> Type of the first input.
- * @param <IN2> Type of the second input.
- * @param <OUT> Output type.
- */
-@PublicEvolving
-public interface TimelyCoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
-
-	/**
-	 * This method is called for each element in the first of the connected streams.
-	 * 
-	 * @param value The stream element
-	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the element,
-	 *            querying the {@link TimeDomain} of the firing timer and getting a
-	 *            {@link TimerService} for registering timers and querying the time.
-	 *            The context is only valid during the invocation of this method, do not store it.
-	 * @param out The collector to emit resulting elements to
-	 * @throws Exception The function may throw exceptions which cause the streaming program
-	 *                   to fail and go into recovery.
-	 */
-	void flatMap1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
-
-	/**
-	 * This method is called for each element in the second of the connected streams.
-	 * 
-	 * @param value The stream element
-	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the element,
-	 *            querying the {@link TimeDomain} of the firing timer and getting a
-	 *            {@link TimerService} for registering timers and querying the time.
-	 *            The context is only valid during the invocation of this method, do not store it.
-	 * @param out The collector to emit resulting elements to
-	 * @throws Exception The function may throw exceptions which cause the streaming program
-	 *                   to fail and go into recovery.
-	 */
-	void flatMap2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
-
-	/**
-	 * Called when a timer set using {@link TimerService} fires.
-	 *
-	 * @param timestamp The timestamp of the firing timer.
-	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
-	 *            querying the {@link TimeDomain} of the firing timer and getting a
-	 *            {@link TimerService} for registering timers and querying the time.
-	 *            The context is only valid during the invocation of this method, do not store it.
-	 * @param out The collector for returning result values.
-	 *
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
-	void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception ;
-
-	/**
-	 * Information available in an invocation of {@link #flatMap1(Object, Context, Collector)}/
-	 * {@link #flatMap2(Object, Context, Collector)}
-	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
-	 */
-	interface Context {
-
-		/**
-		 * Timestamp of the element currently being processed or timestamp of a firing timer.
-		 *
-		 * <p>This might be {@code null}, for example if the time characteristic of your program
-		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
-		 */
-		Long timestamp();
-
-		/**
-		 * A {@link TimerService} for querying time and registering timers.
-		 */
-		TimerService timerService();
-	}
-
-	/**
-	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
-	 */
-	interface OnTimerContext extends TimelyFlatMapFunction.Context {
-		/**
-		 * The {@link TimeDomain} of the firing timer.
-		 */
-		TimeDomain timeDomain();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
new file mode 100644
index 0000000..3b13360
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@Internal
+public class ProcessOperator<K, IN, OUT>
+		extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
+		implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient TimestampedCollector<OUT> collector;
+
+	private transient TimerService timerService;
+
+	private transient ContextImpl<IN> context;
+
+	private transient OnTimerContextImpl onTimerContext;
+
+	public ProcessOperator(ProcessFunction<IN, OUT> flatMapper) {
+		super(flatMapper);
+
+		chainingStrategy = ChainingStrategy.ALWAYS;
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		collector = new TimestampedCollector<>(output);
+
+		InternalTimerService<VoidNamespace> internalTimerService =
+				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
+
+		this.timerService = new SimpleTimerService(internalTimerService);
+
+		context = new ContextImpl<>(timerService);
+		onTimerContext = new OnTimerContextImpl(timerService);
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+		onTimerContext.timer = timer;
+		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+		onTimerContext.timeDomain = null;
+		onTimerContext.timer = null;
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+		onTimerContext.timer = timer;
+		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+		onTimerContext.timeDomain = null;
+		onTimerContext.timer = null;
+	}
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		collector.setTimestamp(element);
+		context.element = element;
+		userFunction.processElement(element.getValue(), context, collector);
+		context.element = null;
+	}
+
+	private static class ContextImpl<T> implements ProcessFunction.Context {
+
+		private final TimerService timerService;
+
+		private StreamRecord<T> element;
+
+		ContextImpl(TimerService timerService) {
+			this.timerService = checkNotNull(timerService);
+		}
+
+		@Override
+		public Long timestamp() {
+			checkState(element != null);
+
+			if (element.hasTimestamp()) {
+				return element.getTimestamp();
+			} else {
+				return null;
+			}
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+	}
+
+	private static class OnTimerContextImpl implements ProcessFunction.OnTimerContext{
+
+		private final TimerService timerService;
+
+		private TimeDomain timeDomain;
+
+		private InternalTimer<?, VoidNamespace> timer;
+
+		OnTimerContextImpl(TimerService timerService) {
+			this.timerService = checkNotNull(timerService);
+		}
+
+		@Override
+		public TimeDomain timeDomain() {
+			checkState(timeDomain != null);
+			return timeDomain;
+		}
+
+		@Override
+		public Long timestamp() {
+			checkState(timer != null);
+			return timer.getTimestamp();
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
deleted file mode 100644
index bafc435..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.SimpleTimerService;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-@Internal
-public class StreamTimelyFlatMap<K, IN, OUT>
-		extends AbstractUdfStreamOperator<OUT, TimelyFlatMapFunction<IN, OUT>>
-		implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
-
-	private static final long serialVersionUID = 1L;
-
-	private transient TimestampedCollector<OUT> collector;
-
-	private transient TimerService timerService;
-
-	private transient ContextImpl<IN> context;
-
-	private transient OnTimerContextImpl onTimerContext;
-
-	public StreamTimelyFlatMap(TimelyFlatMapFunction<IN, OUT> flatMapper) {
-		super(flatMapper);
-
-		chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		collector = new TimestampedCollector<>(output);
-
-		InternalTimerService<VoidNamespace> internalTimerService =
-				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
-
-		this.timerService = new SimpleTimerService(internalTimerService);
-
-		context = new ContextImpl<>(timerService);
-		onTimerContext = new OnTimerContextImpl(timerService);
-	}
-
-	@Override
-	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
-		collector.setAbsoluteTimestamp(timer.getTimestamp());
-		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
-		onTimerContext.timer = timer;
-		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-		onTimerContext.timeDomain = null;
-		onTimerContext.timer = null;
-	}
-
-	@Override
-	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
-		collector.setAbsoluteTimestamp(timer.getTimestamp());
-		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
-		onTimerContext.timer = timer;
-		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-		onTimerContext.timeDomain = null;
-		onTimerContext.timer = null;
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		collector.setTimestamp(element);
-		context.element = element;
-		userFunction.flatMap(element.getValue(), context, collector);
-		context.element = null;
-	}
-
-	private static class ContextImpl<T> implements TimelyFlatMapFunction.Context {
-
-		private final TimerService timerService;
-
-		private StreamRecord<T> element;
-
-		ContextImpl(TimerService timerService) {
-			this.timerService = checkNotNull(timerService);
-		}
-
-		@Override
-		public Long timestamp() {
-			checkState(element != null);
-
-			if (element.hasTimestamp()) {
-				return element.getTimestamp();
-			} else {
-				return null;
-			}
-		}
-
-		@Override
-		public TimerService timerService() {
-			return timerService;
-		}
-	}
-
-	private static class OnTimerContextImpl implements TimelyFlatMapFunction.OnTimerContext{
-
-		private final TimerService timerService;
-
-		private TimeDomain timeDomain;
-
-		private InternalTimer<?, VoidNamespace> timer;
-
-		OnTimerContextImpl(TimerService timerService) {
-			this.timerService = checkNotNull(timerService);
-		}
-
-		@Override
-		public TimeDomain timeDomain() {
-			checkState(timeDomain != null);
-			return timeDomain;
-		}
-
-		@Override
-		public Long timestamp() {
-			checkState(timer != null);
-			return timer.getTimestamp();
-		}
-
-		@Override
-		public TimerService timerService() {
-			return timerService;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
new file mode 100644
index 0000000..e6c3d3f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@Internal
+public class CoProcessOperator<K, IN1, IN2, OUT>
+		extends AbstractUdfStreamOperator<OUT, CoProcessFunction<IN1, IN2, OUT>>
+		implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient TimestampedCollector<OUT> collector;
+
+	private transient TimerService timerService;
+
+	private transient ContextImpl context;
+
+	private transient OnTimerContextImpl onTimerContext;
+
+	public CoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
+		super(flatMapper);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		collector = new TimestampedCollector<>(output);
+
+		InternalTimerService<VoidNamespace> internalTimerService =
+				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
+
+		this.timerService = new SimpleTimerService(internalTimerService);
+
+		context = new ContextImpl(timerService);
+		onTimerContext = new OnTimerContextImpl(timerService);
+	}
+
+	@Override
+	public void processElement1(StreamRecord<IN1> element) throws Exception {
+		collector.setTimestamp(element);
+		context.element = element;
+		userFunction.processElement1(element.getValue(), context, collector);
+		context.element = null;
+	}
+
+	@Override
+	public void processElement2(StreamRecord<IN2> element) throws Exception {
+		collector.setTimestamp(element);
+		context.element = element;
+		userFunction.processElement2(element.getValue(), context, collector);
+		context.element = null;
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+		onTimerContext.timer = timer;
+		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+		onTimerContext.timeDomain = null;
+		onTimerContext.timer = null;
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+		onTimerContext.timer = timer;
+		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+		onTimerContext.timeDomain = null;
+		onTimerContext.timer = null;
+	}
+
+	protected TimestampedCollector<OUT> getCollector() {
+		return collector;
+	}
+
+	private static class ContextImpl implements CoProcessFunction.Context {
+
+		private final TimerService timerService;
+
+		private StreamRecord<?> element;
+
+		ContextImpl(TimerService timerService) {
+			this.timerService = checkNotNull(timerService);
+		}
+
+		@Override
+		public Long timestamp() {
+			checkState(element != null);
+
+			if (element.hasTimestamp()) {
+				return element.getTimestamp();
+			} else {
+				return null;
+			}
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+	}
+
+	private static class OnTimerContextImpl implements CoProcessFunction.OnTimerContext {
+
+		private final TimerService timerService;
+
+		private TimeDomain timeDomain;
+
+		private InternalTimer<?, VoidNamespace> timer;
+
+		OnTimerContextImpl(TimerService timerService) {
+			this.timerService = checkNotNull(timerService);
+		}
+
+		@Override
+		public TimeDomain timeDomain() {
+			checkState(timeDomain != null);
+			return timeDomain;
+		}
+
+		@Override
+		public Long timestamp() {
+			checkState(timer != null);
+			return timer.getTimestamp();
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
deleted file mode 100644
index 75e4c14..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.SimpleTimerService;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.Triggerable;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-@Internal
-public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
-		extends AbstractUdfStreamOperator<OUT, TimelyCoFlatMapFunction<IN1, IN2, OUT>>
-		implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
-
-	private static final long serialVersionUID = 1L;
-
-	private transient TimestampedCollector<OUT> collector;
-
-	private transient TimerService timerService;
-
-	private transient ContextImpl context;
-
-	private transient OnTimerContextImpl onTimerContext;
-
-	public CoStreamTimelyFlatMap(TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
-		super(flatMapper);
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		collector = new TimestampedCollector<>(output);
-
-		InternalTimerService<VoidNamespace> internalTimerService =
-				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
-
-		this.timerService = new SimpleTimerService(internalTimerService);
-
-		context = new ContextImpl(timerService);
-		onTimerContext = new OnTimerContextImpl(timerService);
-	}
-
-	@Override
-	public void processElement1(StreamRecord<IN1> element) throws Exception {
-		collector.setTimestamp(element);
-		context.element = element;
-		userFunction.flatMap1(element.getValue(), context, collector);
-		context.element = null;
-	}
-
-	@Override
-	public void processElement2(StreamRecord<IN2> element) throws Exception {
-		collector.setTimestamp(element);
-		context.element = element;
-		userFunction.flatMap2(element.getValue(), context, collector);
-		context.element = null;
-	}
-
-	@Override
-	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
-		collector.setAbsoluteTimestamp(timer.getTimestamp());
-		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
-		onTimerContext.timer = timer;
-		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-		onTimerContext.timeDomain = null;
-		onTimerContext.timer = null;
-	}
-
-	@Override
-	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
-		collector.setAbsoluteTimestamp(timer.getTimestamp());
-		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
-		onTimerContext.timer = timer;
-		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-		onTimerContext.timeDomain = null;
-		onTimerContext.timer = null;
-	}
-
-	protected TimestampedCollector<OUT> getCollector() {
-		return collector;
-	}
-
-	private static class ContextImpl implements TimelyCoFlatMapFunction.Context {
-
-		private final TimerService timerService;
-
-		private StreamRecord<?> element;
-
-		ContextImpl(TimerService timerService) {
-			this.timerService = checkNotNull(timerService);
-		}
-
-		@Override
-		public Long timestamp() {
-			checkState(element != null);
-
-			if (element.hasTimestamp()) {
-				return element.getTimestamp();
-			} else {
-				return null;
-			}
-		}
-
-		@Override
-		public TimerService timerService() {
-			return timerService;
-		}
-	}
-
-	private static class OnTimerContextImpl implements TimelyCoFlatMapFunction.OnTimerContext {
-
-		private final TimerService timerService;
-
-		private TimeDomain timeDomain;
-
-		private InternalTimer<?, VoidNamespace> timer;
-
-		OnTimerContextImpl(TimerService timerService) {
-			this.timerService = checkNotNull(timerService);
-		}
-
-		@Override
-		public TimeDomain timeDomain() {
-			checkState(timeDomain != null);
-			return timeDomain;
-		}
-
-		@Override
-		public Long timestamp() {
-			checkState(timer != null);
-			return timer.getTimestamp();
-		}
-
-		@Override
-		public TimerService timerService() {
-			return timerService;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 8f002ba..eaac6b8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -47,7 +47,7 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -547,18 +547,19 @@ public class DataStreamTest {
 	}
 
 	/**
-	 * Verify that a timely flat map call is correctly translated to an operator.
+	 * Verify that a {@link KeyedStream#process(ProcessFunction)} call is correctly translated to
+	 * an operator.
 	 */
 	@Test
-	public void testTimelyFlatMapTranslation() {
+	public void testProcessTranslation() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		DataStreamSource<Long> src = env.generateSequence(0, 0);
 
-		TimelyFlatMapFunction<Long, Integer> timelyFlatMapFunction = new TimelyFlatMapFunction<Long, Integer>() {
+		ProcessFunction<Long, Integer> processFunction = new ProcessFunction<Long, Integer>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
-			public void flatMap(
+			public void processElement(
 					Long value,
 					Context ctx,
 					Collector<Integer> out) throws Exception {
@@ -574,14 +575,14 @@ public class DataStreamTest {
 			}
 		};
 
-		DataStream<Integer> flatMapped = src
+		DataStream<Integer> processed = src
 				.keyBy(new IdentityKeySelector<Long>())
-				.flatMap(timelyFlatMapFunction);
+				.process(processFunction);
 
-		flatMapped.addSink(new DiscardingSink<Integer>());
+		processed.addSink(new DiscardingSink<Integer>());
 
-		assertEquals(timelyFlatMapFunction, getFunctionForDataStream(flatMapped));
-		assertTrue(getOperatorForDataStream(flatMapped) instanceof StreamTimelyFlatMap);
+		assertEquals(processFunction, getFunctionForDataStream(processed));
+		assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator);
 	}
 
 	@Test