You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/28 17:45:10 UTC
[1/2] flink git commit: [FLINK-5026] Rename TimelyFlatMap to Process
Repository: flink
Updated Branches:
refs/heads/master 3a27f55cf -> 910f733f5
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
new file mode 100644
index 0000000..74fd044
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.functions.RichProcessFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link ProcessOperator}.
+ */
+public class ProcessOperatorTest extends TestLogger {
+
+ @Test
+ public void testTimestampAndWatermarkQuerying() throws Exception {
+
+ ProcessOperator<Integer, Integer, String> operator =
+ new ProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark(new Watermark(17));
+ testHarness.processElement(new StreamRecord<>(5, 12L));
+
+ testHarness.processWatermark(new Watermark(42));
+ testHarness.processElement(new StreamRecord<>(6, 13L));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(17L));
+ expectedOutput.add(new StreamRecord<>("5TIME:17 TS:12", 12L));
+ expectedOutput.add(new Watermark(42L));
+ expectedOutput.add(new StreamRecord<>("6TIME:42 TS:13", 13L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testTimestampAndProcessingTimeQuerying() throws Exception {
+
+ ProcessOperator<Integer, Integer, String> operator =
+ new ProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(17);
+ testHarness.processElement(new StreamRecord<>(5));
+
+ testHarness.setProcessingTime(42);
+ testHarness.processElement(new StreamRecord<>(6));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("5TIME:17 TS:null"));
+ expectedOutput.add(new StreamRecord<>("6TIME:42 TS:null"));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testEventTimeTimers() throws Exception {
+
+ ProcessOperator<Integer, Integer, Integer> operator =
+ new ProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark(new Watermark(0));
+
+ testHarness.processElement(new StreamRecord<>(17, 42L));
+
+ testHarness.processWatermark(new Watermark(5));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(0L));
+ expectedOutput.add(new StreamRecord<>(17, 42L));
+ expectedOutput.add(new StreamRecord<>(1777, 5L));
+ expectedOutput.add(new Watermark(5L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testProcessingTimeTimers() throws Exception {
+
+ ProcessOperator<Integer, Integer, Integer> operator =
+ new ProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(17));
+
+ testHarness.setProcessingTime(5);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>(17));
+ expectedOutput.add(new StreamRecord<>(1777, 5L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ /**
+ * Verifies that we don't have leakage between different keys.
+ */
+ @Test
+ public void testEventTimeTimerWithState() throws Exception {
+
+ ProcessOperator<Integer, Integer, String> operator =
+ new ProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark(new Watermark(1));
+ testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
+
+ testHarness.processWatermark(new Watermark(2));
+ testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
+
+ testHarness.processWatermark(new Watermark(6));
+ testHarness.processWatermark(new Watermark(7));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(1L));
+ expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
+ expectedOutput.add(new Watermark(2L));
+ expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
+ expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+ expectedOutput.add(new Watermark(6L));
+ expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+ expectedOutput.add(new Watermark(7L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ /**
+ * Verifies that we don't have leakage between different keys.
+ */
+ @Test
+ public void testProcessingTimeTimerWithState() throws Exception {
+
+ ProcessOperator<Integer, Integer, String> operator =
+ new ProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(1);
+ testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
+
+ testHarness.setProcessingTime(2);
+ testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
+
+ testHarness.setProcessingTime(6);
+ testHarness.setProcessingTime(7);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("INPUT:17"));
+ expectedOutput.add(new StreamRecord<>("INPUT:42"));
+ expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+ expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testSnapshotAndRestore() throws Exception {
+
+ ProcessOperator<Integer, Integer, String> operator =
+ new ProcessOperator<>(new BothTriggeringFlatMapFunction());
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(5, 12L));
+
+ // snapshot and restore from scratch
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+ testHarness.close();
+
+ operator = new ProcessOperator<>(new BothTriggeringFlatMapFunction());
+
+ testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.setProcessingTime(5);
+ testHarness.processWatermark(new Watermark(6));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+ expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+ expectedOutput.add(new Watermark(6));
+
+ System.out.println("GOT: " + testHarness.getOutput());
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T getKey(T value) throws Exception {
+ return value;
+ }
+ }
+
+ private static class QueryingFlatMapFunction implements ProcessFunction<Integer, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TimeDomain timeDomain;
+
+ public QueryingFlatMapFunction(TimeDomain timeDomain) {
+ this.timeDomain = timeDomain;
+ }
+
+ @Override
+ public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+ if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+ out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+ } else {
+ out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+ }
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ }
+ }
+
+ private static class TriggeringFlatMapFunction implements ProcessFunction<Integer, Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TimeDomain timeDomain;
+
+ public TriggeringFlatMapFunction(TimeDomain timeDomain) {
+ this.timeDomain = timeDomain;
+ }
+
+ @Override
+ public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ out.collect(value);
+ if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+ ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+ } else {
+ ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+ }
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<Integer> out) throws Exception {
+
+ assertEquals(this.timeDomain, ctx.timeDomain());
+ out.collect(1777);
+ }
+ }
+
+ private static class TriggeringStatefulFlatMapFunction extends RichProcessFunction<Integer, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<Integer> state =
+ new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null);
+
+ private final TimeDomain timeDomain;
+
+ public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
+ this.timeDomain = timeDomain;
+ }
+
+ @Override
+ public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT:" + value);
+ getRuntimeContext().getState(state).update(value);
+ if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+ ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+ } else {
+ ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+ }
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ assertEquals(this.timeDomain, ctx.timeDomain());
+ out.collect("STATE:" + getRuntimeContext().getState(state).value());
+ }
+ }
+
+ private static class BothTriggeringFlatMapFunction implements ProcessFunction<Integer, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+ ctx.timerService().registerProcessingTimeTimer(5);
+ ctx.timerService().registerEventTimeTimer(6);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
+ out.collect("EVENT:1777");
+ } else {
+ out.collect("PROC:1777");
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
deleted file mode 100644
index 6080ddc..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.operators;
-
-
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests {@link StreamTimelyFlatMap}.
- */
-public class TimelyFlatMapTest extends TestLogger {
-
- @Test
- public void testTimestampAndWatermarkQuerying() throws Exception {
-
- StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
-
- OneInputStreamOperatorTestHarness<Integer, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processWatermark(new Watermark(17));
- testHarness.processElement(new StreamRecord<>(5, 12L));
-
- testHarness.processWatermark(new Watermark(42));
- testHarness.processElement(new StreamRecord<>(6, 13L));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new Watermark(17L));
- expectedOutput.add(new StreamRecord<>("5TIME:17 TS:12", 12L));
- expectedOutput.add(new Watermark(42L));
- expectedOutput.add(new StreamRecord<>("6TIME:42 TS:13", 13L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- @Test
- public void testTimestampAndProcessingTimeQuerying() throws Exception {
-
- StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
-
- OneInputStreamOperatorTestHarness<Integer, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.setProcessingTime(17);
- testHarness.processElement(new StreamRecord<>(5));
-
- testHarness.setProcessingTime(42);
- testHarness.processElement(new StreamRecord<>(6));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>("5TIME:17 TS:null"));
- expectedOutput.add(new StreamRecord<>("6TIME:42 TS:null"));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- @Test
- public void testEventTimeTimers() throws Exception {
-
- StreamTimelyFlatMap<Integer, Integer, Integer> operator =
- new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
-
- OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processWatermark(new Watermark(0));
-
- testHarness.processElement(new StreamRecord<>(17, 42L));
-
- testHarness.processWatermark(new Watermark(5));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new Watermark(0L));
- expectedOutput.add(new StreamRecord<>(17, 42L));
- expectedOutput.add(new StreamRecord<>(1777, 5L));
- expectedOutput.add(new Watermark(5L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- @Test
- public void testProcessingTimeTimers() throws Exception {
-
- StreamTimelyFlatMap<Integer, Integer, Integer> operator =
- new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
-
- OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(17));
-
- testHarness.setProcessingTime(5);
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>(17));
- expectedOutput.add(new StreamRecord<>(1777, 5L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- /**
- * Verifies that we don't have leakage between different keys.
- */
- @Test
- public void testEventTimeTimerWithState() throws Exception {
-
- StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
-
- OneInputStreamOperatorTestHarness<Integer, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processWatermark(new Watermark(1));
- testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
-
- testHarness.processWatermark(new Watermark(2));
- testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
-
- testHarness.processWatermark(new Watermark(6));
- testHarness.processWatermark(new Watermark(7));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new Watermark(1L));
- expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
- expectedOutput.add(new Watermark(2L));
- expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
- expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
- expectedOutput.add(new Watermark(6L));
- expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
- expectedOutput.add(new Watermark(7L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- /**
- * Verifies that we don't have leakage between different keys.
- */
- @Test
- public void testProcessingTimeTimerWithState() throws Exception {
-
- StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
-
- OneInputStreamOperatorTestHarness<Integer, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.setProcessingTime(1);
- testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
-
- testHarness.setProcessingTime(2);
- testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
-
- testHarness.setProcessingTime(6);
- testHarness.setProcessingTime(7);
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>("INPUT:17"));
- expectedOutput.add(new StreamRecord<>("INPUT:42"));
- expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
- expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- @Test
- public void testSnapshotAndRestore() throws Exception {
-
- StreamTimelyFlatMap<Integer, Integer, String> operator =
- new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
-
- OneInputStreamOperatorTestHarness<Integer, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(5, 12L));
-
- // snapshot and restore from scratch
- OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-
- testHarness.close();
-
- operator = new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
-
- testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
- testHarness.setup();
- testHarness.initializeState(snapshot);
- testHarness.open();
-
- testHarness.setProcessingTime(5);
- testHarness.processWatermark(new Watermark(6));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
- expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
- expectedOutput.add(new Watermark(6));
-
- System.out.println("GOT: " + testHarness.getOutput());
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- private static class IdentityKeySelector<T> implements KeySelector<T, T> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public T getKey(T value) throws Exception {
- return value;
- }
- }
-
- private static class QueryingFlatMapFunction implements TimelyFlatMapFunction<Integer, String> {
-
- private static final long serialVersionUID = 1L;
-
- private final TimeDomain timeDomain;
-
- public QueryingFlatMapFunction(TimeDomain timeDomain) {
- this.timeDomain = timeDomain;
- }
-
- @Override
- public void flatMap(Integer value, Context ctx, Collector<String> out) throws Exception {
- if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
- out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
- } else {
- out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
- }
- }
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
- }
- }
-
- private static class TriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, Integer> {
-
- private static final long serialVersionUID = 1L;
-
- private final TimeDomain timeDomain;
-
- public TriggeringFlatMapFunction(TimeDomain timeDomain) {
- this.timeDomain = timeDomain;
- }
-
- @Override
- public void flatMap(Integer value, Context ctx, Collector<Integer> out) throws Exception {
- out.collect(value);
- if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
- ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
- } else {
- ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
- }
- }
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<Integer> out) throws Exception {
-
- assertEquals(this.timeDomain, ctx.timeDomain());
- out.collect(1777);
- }
- }
-
- private static class TriggeringStatefulFlatMapFunction extends RichTimelyFlatMapFunction<Integer, String> {
-
- private static final long serialVersionUID = 1L;
-
- private final ValueStateDescriptor<Integer> state =
- new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null);
-
- private final TimeDomain timeDomain;
-
- public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
- this.timeDomain = timeDomain;
- }
-
- @Override
- public void flatMap(Integer value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT:" + value);
- getRuntimeContext().getState(state).update(value);
- if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
- ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
- } else {
- ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
- }
- }
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
- assertEquals(this.timeDomain, ctx.timeDomain());
- out.collect("STATE:" + getRuntimeContext().getState(state).value());
- }
- }
-
- private static class BothTriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(Integer value, Context ctx, Collector<String> out) throws Exception {
- ctx.timerService().registerProcessingTimeTimer(5);
- ctx.timerService().registerEventTimeTimer(6);
- }
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
- if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
- out.collect("EVENT:1777");
- } else {
- out.collect("PROC:1777");
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
new file mode 100644
index 0000000..a449359
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
@@ -0,0 +1,536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators.co;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link CoProcessOperator}.
+ */
+public class CoProcessOperatorTest extends TestLogger {
+
+ @Test
+ public void testTimestampAndWatermarkQuerying() throws Exception {
+
+ CoProcessOperator<String, Integer, String, String> operator =
+ new CoProcessOperator<>(new WatermarkQueryingProcessFunction());
+
+ TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark1(new Watermark(17));
+ testHarness.processWatermark2(new Watermark(17));
+ testHarness.processElement1(new StreamRecord<>(5, 12L));
+
+ testHarness.processWatermark1(new Watermark(42));
+ testHarness.processWatermark2(new Watermark(42));
+ testHarness.processElement2(new StreamRecord<>("6", 13L));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(17L));
+ expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L));
+ expectedOutput.add(new Watermark(42L));
+ expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testTimestampAndProcessingTimeQuerying() throws Exception {
+
+ CoProcessOperator<String, Integer, String, String> operator =
+ new CoProcessOperator<>(new ProcessingTimeQueryingProcessFunction());
+
+ TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(17);
+ testHarness.processElement1(new StreamRecord<>(5));
+
+ testHarness.setProcessingTime(42);
+ testHarness.processElement2(new StreamRecord<>("6"));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("5PT:17 TS:null"));
+ expectedOutput.add(new StreamRecord<>("6PT:42 TS:null"));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testEventTimeTimers() throws Exception {
+
+ CoProcessOperator<String, Integer, String, String> operator =
+ new CoProcessOperator<>(new EventTimeTriggeringProcessFunction());
+
+ TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement1(new StreamRecord<>(17, 42L));
+ testHarness.processElement2(new StreamRecord<>("18", 42L));
+
+ testHarness.processWatermark1(new Watermark(5));
+ testHarness.processWatermark2(new Watermark(5));
+
+ testHarness.processWatermark1(new Watermark(6));
+ testHarness.processWatermark2(new Watermark(6));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
+ expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
+ expectedOutput.add(new StreamRecord<>("1777", 5L));
+ expectedOutput.add(new Watermark(5L));
+ expectedOutput.add(new StreamRecord<>("1777", 6L));
+ expectedOutput.add(new Watermark(6L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testProcessingTimeTimers() throws Exception {
+
+ CoProcessOperator<String, Integer, String, String> operator =
+ new CoProcessOperator<>(new ProcessingTimeTriggeringProcessFunction());
+
+ TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement1(new StreamRecord<>(17));
+ testHarness.processElement2(new StreamRecord<>("18"));
+
+ testHarness.setProcessingTime(5);
+ testHarness.setProcessingTime(6);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+ expectedOutput.add(new StreamRecord<>("INPUT2:18"));
+ expectedOutput.add(new StreamRecord<>("1777", 5L));
+ expectedOutput.add(new StreamRecord<>("1777", 6L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ /**
+ * Verifies that we don't have leakage between different keys.
+ */
+ @Test
+ public void testEventTimeTimerWithState() throws Exception {
+
+ CoProcessOperator<String, Integer, String, String> operator =
+ new CoProcessOperator<>(new EventTimeTriggeringStatefulProcessFunction());
+
+ TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark1(new Watermark(1));
+ testHarness.processWatermark2(new Watermark(1));
+ testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6
+
+ testHarness.processWatermark1(new Watermark(2));
+ testHarness.processWatermark2(new Watermark(2));
+ testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7
+
+ testHarness.processWatermark1(new Watermark(6));
+ testHarness.processWatermark2(new Watermark(6));
+
+ testHarness.processWatermark1(new Watermark(7));
+ testHarness.processWatermark2(new Watermark(7));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(1L));
+ expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L));
+ expectedOutput.add(new Watermark(2L));
+ expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L));
+ expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+ expectedOutput.add(new Watermark(6L));
+ expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+ expectedOutput.add(new Watermark(7L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ /**
+ * Verifies that we don't have leakage between different keys.
+ */
+ @Test
+ public void testProcessingTimeTimerWithState() throws Exception {
+
+ CoProcessOperator<String, Integer, String, String> operator =
+ new CoProcessOperator<>(new ProcessingTimeTriggeringStatefulProcessFunction());
+
+ TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(1);
+ testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6
+
+ testHarness.setProcessingTime(2);
+ testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7
+
+ testHarness.setProcessingTime(6);
+ testHarness.setProcessingTime(7);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+ expectedOutput.add(new StreamRecord<>("INPUT2:42"));
+ expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+ expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testSnapshotAndRestore() throws Exception {
+
+ CoProcessOperator<String, Integer, String, String> operator =
+ new CoProcessOperator<>(new BothTriggeringProcessFunction());
+
+ TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement1(new StreamRecord<>(5, 12L));
+ testHarness.processElement2(new StreamRecord<>("5", 12L));
+
+ // snapshot and restore from scratch
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+ testHarness.close();
+
+ operator = new CoProcessOperator<>(new BothTriggeringProcessFunction());
+
+ testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.setProcessingTime(5);
+ testHarness.processWatermark1(new Watermark(6));
+ testHarness.processWatermark2(new Watermark(6));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+ expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+ expectedOutput.add(new Watermark(6));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+
+ private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Integer value) throws Exception {
+ return "" + value;
+ }
+ }
+
+ private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T getKey(T value) throws Exception {
+ return value;
+ }
+ }
+
+ private static class WatermarkQueryingProcessFunction implements CoProcessFunction<Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+ out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+ out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ }
+ }
+
+ private static class EventTimeTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT1:" + value);
+ ctx.timerService().registerEventTimeTimer(5);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT2:" + value);
+ ctx.timerService().registerEventTimeTimer(6);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+
+ assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
+ out.collect("" + 1777);
+ }
+ }
+
+ private static class EventTimeTriggeringStatefulProcessFunction extends RichCoProcessFunction<Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<String> state =
+ new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null);
+
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT1:" + value);
+ getRuntimeContext().getState(state).update("" + value);
+ ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT2:" + value);
+ getRuntimeContext().getState(state).update(value);
+ ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+ }
+
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
+ out.collect("STATE:" + getRuntimeContext().getState(state).value());
+ }
+ }
+
+ private static class ProcessingTimeTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT1:" + value);
+ ctx.timerService().registerProcessingTimeTimer(5);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT2:" + value);
+ ctx.timerService().registerProcessingTimeTimer(6);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+
+ assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
+ out.collect("" + 1777);
+ }
+ }
+
+ private static class ProcessingTimeQueryingProcessFunction implements CoProcessFunction<Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+ out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+ out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ }
+ }
+
+ private static class ProcessingTimeTriggeringStatefulProcessFunction extends RichCoProcessFunction<Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<String> state =
+ new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null);
+
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT1:" + value);
+ getRuntimeContext().getState(state).update("" + value);
+ ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT2:" + value);
+ getRuntimeContext().getState(state).update(value);
+ ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+ }
+
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
+ out.collect("STATE:" + getRuntimeContext().getState(state).value());
+ }
+ }
+
+ private static class BothTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+ ctx.timerService().registerEventTimeTimer(6);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+ ctx.timerService().registerProcessingTimeTimer(5);
+ }
+
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
+ out.collect("EVENT:1777");
+ } else {
+ out.collect("PROC:1777");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
deleted file mode 100644
index 7c29631..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
+++ /dev/null
@@ -1,536 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.operators.co;
-
-
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction;
-import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.TestLogger;
-import org.junit.Test;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests {@link CoStreamTimelyFlatMap}.
- */
-public class TimelyCoFlatMapTest extends TestLogger {
-
- @Test
- public void testTimestampAndWatermarkQuerying() throws Exception {
-
- CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(new WatermarkQueryingFlatMapFunction());
-
- TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
- new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processWatermark1(new Watermark(17));
- testHarness.processWatermark2(new Watermark(17));
- testHarness.processElement1(new StreamRecord<>(5, 12L));
-
- testHarness.processWatermark1(new Watermark(42));
- testHarness.processWatermark2(new Watermark(42));
- testHarness.processElement2(new StreamRecord<>("6", 13L));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new Watermark(17L));
- expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L));
- expectedOutput.add(new Watermark(42L));
- expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- @Test
- public void testTimestampAndProcessingTimeQuerying() throws Exception {
-
- CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(new ProcessingTimeQueryingFlatMapFunction());
-
- TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
- new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.setProcessingTime(17);
- testHarness.processElement1(new StreamRecord<>(5));
-
- testHarness.setProcessingTime(42);
- testHarness.processElement2(new StreamRecord<>("6"));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>("5PT:17 TS:null"));
- expectedOutput.add(new StreamRecord<>("6PT:42 TS:null"));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- @Test
- public void testEventTimeTimers() throws Exception {
-
- CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(new EventTimeTriggeringFlatMapFunction());
-
- TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
- new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processElement1(new StreamRecord<>(17, 42L));
- testHarness.processElement2(new StreamRecord<>("18", 42L));
-
- testHarness.processWatermark1(new Watermark(5));
- testHarness.processWatermark2(new Watermark(5));
-
- testHarness.processWatermark1(new Watermark(6));
- testHarness.processWatermark2(new Watermark(6));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
- expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
- expectedOutput.add(new StreamRecord<>("1777", 5L));
- expectedOutput.add(new Watermark(5L));
- expectedOutput.add(new StreamRecord<>("1777", 6L));
- expectedOutput.add(new Watermark(6L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- @Test
- public void testProcessingTimeTimers() throws Exception {
-
- CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringFlatMapFunction());
-
- TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
- new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processElement1(new StreamRecord<>(17));
- testHarness.processElement2(new StreamRecord<>("18"));
-
- testHarness.setProcessingTime(5);
- testHarness.setProcessingTime(6);
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>("INPUT1:17"));
- expectedOutput.add(new StreamRecord<>("INPUT2:18"));
- expectedOutput.add(new StreamRecord<>("1777", 5L));
- expectedOutput.add(new StreamRecord<>("1777", 6L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- /**
- * Verifies that we don't have leakage between different keys.
- */
- @Test
- public void testEventTimeTimerWithState() throws Exception {
-
- CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(new EventTimeTriggeringStatefulFlatMapFunction());
-
- TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
- new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processWatermark1(new Watermark(1));
- testHarness.processWatermark2(new Watermark(1));
- testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6
-
- testHarness.processWatermark1(new Watermark(2));
- testHarness.processWatermark2(new Watermark(2));
- testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7
-
- testHarness.processWatermark1(new Watermark(6));
- testHarness.processWatermark2(new Watermark(6));
-
- testHarness.processWatermark1(new Watermark(7));
- testHarness.processWatermark2(new Watermark(7));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new Watermark(1L));
- expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L));
- expectedOutput.add(new Watermark(2L));
- expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L));
- expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
- expectedOutput.add(new Watermark(6L));
- expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
- expectedOutput.add(new Watermark(7L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- /**
- * Verifies that we don't have leakage between different keys.
- */
- @Test
- public void testProcessingTimeTimerWithState() throws Exception {
-
- CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringStatefulFlatMapFunction());
-
- TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
- new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.setProcessingTime(1);
- testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6
-
- testHarness.setProcessingTime(2);
- testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7
-
- testHarness.setProcessingTime(6);
- testHarness.setProcessingTime(7);
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>("INPUT1:17"));
- expectedOutput.add(new StreamRecord<>("INPUT2:42"));
- expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
- expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- @Test
- public void testSnapshotAndRestore() throws Exception {
-
- CoStreamTimelyFlatMap<String, Integer, String, String> operator =
- new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
-
- TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
- new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processElement1(new StreamRecord<>(5, 12L));
- testHarness.processElement2(new StreamRecord<>("5", 12L));
-
- // snapshot and restore from scratch
- OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-
- testHarness.close();
-
- operator = new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
-
- testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.setup();
- testHarness.initializeState(snapshot);
- testHarness.open();
-
- testHarness.setProcessingTime(5);
- testHarness.processWatermark1(new Watermark(6));
- testHarness.processWatermark2(new Watermark(6));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
- expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
- expectedOutput.add(new Watermark(6));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
-
- private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Integer value) throws Exception {
- return "" + value;
- }
- }
-
- private static class IdentityKeySelector<T> implements KeySelector<T, T> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public T getKey(T value) throws Exception {
- return value;
- }
- }
-
- private static class WatermarkQueryingFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception {
- out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
- }
-
- @Override
- public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception {
- out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
- }
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
- }
- }
-
- private static class EventTimeTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT1:" + value);
- ctx.timerService().registerEventTimeTimer(5);
- }
-
- @Override
- public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT2:" + value);
- ctx.timerService().registerEventTimeTimer(6);
- }
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
-
- assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
- out.collect("" + 1777);
- }
- }
-
- private static class EventTimeTriggeringStatefulFlatMapFunction extends RichTimelyCoFlatMapFunction<Integer, String, String> {
-
- private static final long serialVersionUID = 1L;
-
- private final ValueStateDescriptor<String> state =
- new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null);
-
- @Override
- public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT1:" + value);
- getRuntimeContext().getState(state).update("" + value);
- ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
- }
-
- @Override
- public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT2:" + value);
- getRuntimeContext().getState(state).update(value);
- ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
- }
-
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
- assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
- out.collect("STATE:" + getRuntimeContext().getState(state).value());
- }
- }
-
- private static class ProcessingTimeTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT1:" + value);
- ctx.timerService().registerProcessingTimeTimer(5);
- }
-
- @Override
- public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT2:" + value);
- ctx.timerService().registerProcessingTimeTimer(6);
- }
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
-
- assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
- out.collect("" + 1777);
- }
- }
-
- private static class ProcessingTimeQueryingFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception {
- out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
- }
-
- @Override
- public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception {
- out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
- }
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
- }
- }
-
- private static class ProcessingTimeTriggeringStatefulFlatMapFunction extends RichTimelyCoFlatMapFunction<Integer, String, String> {
-
- private static final long serialVersionUID = 1L;
-
- private final ValueStateDescriptor<String> state =
- new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE, null);
-
- @Override
- public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT1:" + value);
- getRuntimeContext().getState(state).update("" + value);
- ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
- }
-
- @Override
- public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT2:" + value);
- getRuntimeContext().getState(state).update(value);
- ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
- }
-
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
- assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
- out.collect("STATE:" + getRuntimeContext().getState(state).value());
- }
- }
-
- private static class BothTriggeringFlatMapFunction implements TimelyCoFlatMapFunction<Integer, String, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap1(Integer value, Context ctx, Collector<String> out) throws Exception {
- ctx.timerService().registerEventTimeTimer(6);
- }
-
- @Override
- public void flatMap2(String value, Context ctx, Collector<String> out) throws Exception {
- ctx.timerService().registerProcessingTimeTimer(5);
- }
-
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
- if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
- out.collect("EVENT:1777");
- } else {
- out.collect("PROC:1777");
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 50526b5..a7325a4 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, TimelyCoFlatMapFunction}
+import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoProcessFunction, RichCoProcessFunction}
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
import org.apache.flink.util.Collector
@@ -101,30 +101,33 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
}
/**
- * Applies the given [[TimelyCoFlatMapFunction]] on the connected input streams,
+ * Applies the given [[CoProcessFunction]] on the connected input streams,
* thereby creating a transformed output stream.
*
- * The function will be called for every element in the streams and can produce
- * zero or more output. The function can also query the time and set timers. When
- * reacting to the firing of set timers the function can emit yet more elements.
+ * The function will be called for every element in the input streams and can produce zero
+ * or more output elements. Contrary to the [[flatMap(CoFlatMapFunction)]] function,
+ * this function can also query the time and set timers. When reacting to the firing of set
+ * timers the function can directly emit elements and/or register yet more timers.
*
- * A [[org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction]]
+ * A [[RichCoProcessFunction]]
* can be used to gain access to features provided by the
* [[org.apache.flink.api.common.functions.RichFunction]] interface.
*
- * @param coFlatMapper The [[TimelyCoFlatMapFunction]] that is called for each element
- * in the stream.
- *
- * @return The transformed { @link DataStream}.
+ * @param coProcessFunction The [[CoProcessFunction]] that is called for each element
+ * in the stream.
+ * @return The transformed [[DataStream]].
*/
- def flatMap[R: TypeInformation](
- coFlatMapper: TimelyCoFlatMapFunction[IN1, IN2, R]) : DataStream[R] = {
+ @PublicEvolving
+ def process[R: TypeInformation](
+ coProcessFunction: CoProcessFunction[IN1, IN2, R]) : DataStream[R] = {
- if (coFlatMapper == null) throw new NullPointerException("FlatMap function must not be null.")
+ if (coProcessFunction == null) {
+ throw new NullPointerException("CoProcessFunction function must not be null.")
+ }
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
- asScalaStream(javaStream.flatMap(coFlatMapper, outType))
+ asScalaStream(javaStream.process(coProcessFunction, outType))
}
@@ -144,14 +147,14 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* @return
* The resulting data stream.
*/
- def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]):
+ def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]):
DataStream[R] = {
-
+
if (coFlatMapper == null) {
throw new NullPointerException("FlatMap function must not be null.")
}
-
- val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+
+ val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]])
}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 66d80c2..f2999b3 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescr
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.streaming.api.datastream.{QueryableStateStream, SingleOutputStreamOperator, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
+import org.apache.flink.streaming.api.functions.{ProcessFunction, RichProcessFunction}
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator}
@@ -54,28 +54,34 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
// ------------------------------------------------------------------------
/**
- * Applies the given [[TimelyFlatMapFunction]] on the input stream, thereby
+ * Applies the given [[ProcessFunction]] on the input stream, thereby
* creating a transformed output stream.
*
* The function will be called for every element in the stream and can produce
* zero or more output. The function can also query the time and set timers. When
* reacting to the firing of set timers the function can emit yet more elements.
*
- * A [[org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction]]
+ * The function will be called for every element in the input streams and can produce zero
+ * or more output elements. Contrary to the [[DataStream#flatMap(FlatMapFunction)]]
+ * function, this function can also query the time and set timers. When reacting to the firing
+ * of set timers the function can directly emit elements and/or register yet more timers.
+ *
+ * A [[RichProcessFunction]]
* can be used to gain access to features provided by the
* [[org.apache.flink.api.common.functions.RichFunction]]
*
- * @param flatMapper The [[TimelyFlatMapFunction]] that is called for each element
+ * @param processFunction The [[ProcessFunction]] that is called for each element
* in the stream.
*/
- def flatMap[R: TypeInformation](
- flatMapper: TimelyFlatMapFunction[T, R]): DataStream[R] = {
+ @PublicEvolving
+ def process[R: TypeInformation](
+ processFunction: ProcessFunction[T, R]): DataStream[R] = {
- if (flatMapper == null) {
- throw new NullPointerException("TimelyFlatMapFunction must not be null.")
+ if (processFunction == null) {
+ throw new NullPointerException("ProcessFunction must not be null.")
}
- asScalaStream(javaStream.flatMap(flatMapper, implicitly[TypeInformation[R]]))
+ asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]]))
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 967142b..adb59f2 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -23,11 +23,11 @@ import java.lang
import org.apache.flink.api.common.functions._
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.collector.selector.OutputSelector
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction.{Context, OnTimerContext}
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
-import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator, StreamTimelyFlatMap}
+import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, ProcessOperator, StreamOperator}
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
@@ -318,26 +318,26 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
}
/**
- * Verify that a timely flat map call is correctly translated to an operator.
+ * Verify that a [[KeyedStream.process()]] call is correctly translated to an operator.
*/
@Test
- def testTimelyFlatMapTranslation(): Unit = {
+ def testProcessTranslation(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val src = env.generateSequence(0, 0)
- val timelyFlatMapFunction = new TimelyFlatMapFunction[Long, Int] {
- override def flatMap(value: Long, ctx: Context, out: Collector[Int]): Unit = ???
+ val processFunction = new ProcessFunction[Long, Int] {
+ override def processElement(value: Long, ctx: Context, out: Collector[Int]): Unit = ???
override def onTimer(
timestamp: Long,
ctx: OnTimerContext,
out: Collector[Int]): Unit = ???
}
- val flatMapped = src.keyBy(x => x).flatMap(timelyFlatMapFunction)
+ val flatMapped = src.keyBy(x => x).process(processFunction)
- assert(timelyFlatMapFunction == getFunctionForDataStream(flatMapped))
- assert(getOperatorForDataStream(flatMapped).isInstanceOf[StreamTimelyFlatMap[_, _, _]])
+ assert(processFunction == getFunctionForDataStream(flatMapped))
+ assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _, _]])
}
@Test def operatorTest() {
[2/2] flink git commit: [FLINK-5026] Rename TimelyFlatMap to Process
Posted by al...@apache.org.
[FLINK-5026] Rename TimelyFlatMap to Process
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/910f733f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/910f733f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/910f733f
Branch: refs/heads/master
Commit: 910f733f5ec52d2dd1e9dcc4ec6a4844cae2f2b4
Parents: 3a27f55
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Nov 11 10:57:25 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 28 18:38:29 2016 +0100
----------------------------------------------------------------------
.../api/datastream/ConnectedStreams.java | 62 ++-
.../streaming/api/datastream/KeyedStream.java | 55 +-
.../api/functions/ProcessFunction.java | 109 ++++
.../api/functions/RichProcessFunction.java | 40 ++
.../functions/RichTimelyFlatMapFunction.java | 40 --
.../api/functions/TimelyFlatMapFunction.java | 110 ----
.../api/functions/co/CoProcessFunction.java | 130 +++++
.../api/functions/co/RichCoProcessFunction.java | 41 ++
.../co/RichTimelyCoFlatMapFunction.java | 41 --
.../functions/co/TimelyCoFlatMapFunction.java | 128 -----
.../api/operators/ProcessOperator.java | 151 ++++++
.../api/operators/StreamTimelyFlatMap.java | 151 ------
.../api/operators/co/CoProcessOperator.java | 167 ++++++
.../api/operators/co/CoStreamTimelyFlatMap.java | 167 ------
.../flink/streaming/api/DataStreamTest.java | 23 +-
.../api/operators/ProcessOperatorTest.java | 404 ++++++++++++++
.../api/operators/TimelyFlatMapTest.java | 404 --------------
.../api/operators/co/CoProcessOperatorTest.java | 536 +++++++++++++++++++
.../api/operators/co/TimelyCoFlatMapTest.java | 536 -------------------
.../streaming/api/scala/ConnectedStreams.scala | 39 +-
.../flink/streaming/api/scala/KeyedStream.scala | 24 +-
.../streaming/api/scala/DataStreamTest.scala | 20 +-
22 files changed, 1701 insertions(+), 1677 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index dc763cb..96a08d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -27,11 +27,12 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-import org.apache.flink.streaming.api.operators.co.CoStreamTimelyFlatMap;
+import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import static java.util.Objects.requireNonNull;
@@ -234,64 +235,71 @@ public class ConnectedStreams<IN1, IN2> {
}
/**
- * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams,
+ * Applies the given {@link CoProcessFunction} on the connected input streams,
* thereby creating a transformed output stream.
*
- * <p>The function will be called for every element in the streams and can produce
- * zero or more output. The function can also query the time and set timers. When
- * reacting to the firing of set timers the function can emit yet more elements.
+ * <p>The function will be called for every element in the input streams and can produce zero or
+ * more output elements. Contrary to the {@link #flatMap(CoFlatMapFunction)} function, this
+ * function can also query the time and set timers. When reacting to the firing of set timers
+ * the function can directly emit elements and/or register yet more timers.
*
- * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+ * <p>A {@link RichCoProcessFunction}
* can be used to gain access to features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
- * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element
+ * @param coProcessFunction The {@link CoProcessFunction} that is called for each element
* in the stream.
*
- * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}.
+ * @param <R> The type of elements emitted by the {@code CoProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
- public <R> SingleOutputStreamOperator<R> flatMap(
- TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper) {
+ @PublicEvolving
+ public <R> SingleOutputStreamOperator<R> process(
+ CoProcessFunction<IN1, IN2, R> coProcessFunction) {
- TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
- TimelyCoFlatMapFunction.class, false, true, getType1(), getType2(),
+ TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coProcessFunction,
+ CoProcessFunction.class, false, true, getType1(), getType2(),
Utils.getCallLocationName(), true);
- return flatMap(coFlatMapper, outTypeInfo);
+ return process(coProcessFunction, outTypeInfo);
}
/**
- * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams,
+ * Applies the given {@link CoProcessFunction} on the connected input streams,
* thereby creating a transformed output stream.
*
- * <p>The function will be called for every element in the streams and can produce
- * zero or more output. The function can also query the time and set timers. When
- * reacting to the firing of set timers the function can emit yet more elements.
+ * <p>The function will be called for every element in the input streams and can produce zero
+ * or more output elements. Contrary to the {@link #flatMap(CoFlatMapFunction)} function,
+ * this function can also query the time and set timers. When reacting to the firing of set
+ * timers the function can directly emit elements and/or register yet more timers.
*
- * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+ * <p>A {@link RichCoProcessFunction}
* can be used to gain access to features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
- * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element
+ * @param coProcessFunction The {@link CoProcessFunction} that is called for each element
* in the stream.
*
- * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}.
+ * @param <R> The type of elements emitted by the {@code CoProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
@Internal
- public <R> SingleOutputStreamOperator<R> flatMap(
- TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper,
+ public <R> SingleOutputStreamOperator<R> process(
+ CoProcessFunction<IN1, IN2, R> coProcessFunction,
TypeInformation<R> outputType) {
- CoStreamTimelyFlatMap<Object, IN1, IN2, R> operator = new CoStreamTimelyFlatMap<>(
- inputStream1.clean(coFlatMapper));
+ if (!(inputStream1 instanceof KeyedStream) || !(inputStream2 instanceof KeyedStream)) {
+ throw new UnsupportedOperationException("A CoProcessFunction can only be applied" +
+ "when both input streams are keyed.");
+ }
- return transform("Co-Flat Map", outputType, operator);
- }
+ CoProcessOperator<Object, IN1, IN2, R> operator = new CoProcessOperator<>(
+ inputStream1.clean(coProcessFunction));
+ return transform("Co-Process", outputType, operator);
+ }
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String functionName,
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 5b00bcd..560ecab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -32,7 +33,8 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.RichProcessFunction;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
@@ -42,7 +44,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
@@ -173,67 +175,70 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
}
/**
- * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby
+ * Applies the given {@link ProcessFunction} on the input stream, thereby
* creating a transformed output stream.
*
- * <p>The function will be called for every element in the stream and can produce
- * zero or more output. The function can also query the time and set timers. When
- * reacting to the firing of set timers the function can emit yet more elements.
+ * <p>The function will be called for every element in the input streams and can produce zero
+ * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
+ * function, this function can also query the time and set timers. When reacting to the firing
+ * of set timers the function can directly emit elements and/or register yet more timers.
*
- * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+ * <p>A {@link RichProcessFunction}
* can be used to gain access to features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
- * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element
+ * @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
*
- * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}.
+ * @param <R> The type of elements emitted by the {@code ProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
- public <R> SingleOutputStreamOperator<R> flatMap(TimelyFlatMapFunction<T, R> flatMapper) {
+ @PublicEvolving
+ public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
- flatMapper,
- TimelyFlatMapFunction.class,
+ processFunction,
+ ProcessFunction.class,
false,
true,
getType(),
Utils.getCallLocationName(),
true);
- return flatMap(flatMapper, outType);
+ return process(processFunction, outType);
}
/**
- * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby
+ * Applies the given {@link ProcessFunction} on the input stream, thereby
* creating a transformed output stream.
*
- * <p>The function will be called for every element in the stream and can produce
- * zero or more output. The function can also query the time and set timers. When
- * reacting to the firing of set timers the function can emit yet more elements.
+ * <p>The function will be called for every element in the input streams and can produce zero
+ * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
+ * function, this function can also query the time and set timers. When reacting to the firing
+ * of set timers the function can directly emit elements and/or register yet more timers.
*
- * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+ * <p>A {@link RichProcessFunction}
* can be used to gain access to features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
- * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element
+ * @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
* @param outputType {@link TypeInformation} for the result type of the function.
*
- * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}.
+ * @param <R> The type of elements emitted by the {@code ProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
@Internal
- public <R> SingleOutputStreamOperator<R> flatMap(
- TimelyFlatMapFunction<T, R> flatMapper,
+ public <R> SingleOutputStreamOperator<R> process(
+ ProcessFunction<T, R> processFunction,
TypeInformation<R> outputType) {
- StreamTimelyFlatMap<KEY, T, R> operator =
- new StreamTimelyFlatMap<>(clean(flatMapper));
+ ProcessOperator<KEY, T, R> operator =
+ new ProcessOperator<>(clean(processFunction));
- return transform("Flat Map", outputType, operator);
+ return transform("Process", outputType, operator);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
new file mode 100644
index 0000000..fd0a829
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+/**
+ * A function that processes elements of a stream.
+ *
+ * <p>The function will be called for every element in the input stream and can produce
+ * zero or more output. The function can also query the time and set timers. When
+ * reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * <p>The function will be called for every element in the input stream and can produce
+ * zero or more output elements. Contrary to the
+ * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query
+ * the time (both event and processing) and set timers, through the provided {@link Context}.
+ * When reacting to the firing of set timers the function can directly emit a result, and/or
+ * register a timer that will trigger an action in the future.
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the output elements.
+ */
+@PublicEvolving
+public interface ProcessFunction<I, O> extends Function {
+
+ /**
+ * Process one element from the input stream.
+ *
+ * <p>This function can output zero or more elements using the {@link Collector} parameter
+ * and also update internal state or set timers using the {@link Context} parameter.
+ *
+ * @param value The input value.
+ * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
+ * a {@link TimerService} for registering timers and querying the time. The
+ * context is only valid during the invocation of this method, do not store it.
+ * @param out The collector for returning result values.
+ *
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+ * to fail and may trigger recovery.
+ */
+ void processElement(I value, Context ctx, Collector<O> out) throws Exception;
+
+ /**
+ * Called when a timer set using {@link TimerService} fires.
+ *
+ * @param timestamp The timestamp of the firing timer.
+ * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
+ * querying the {@link TimeDomain} of the firing timer and getting a
+ * {@link TimerService} for registering timers and querying the time.
+ * The context is only valid during the invocation of this method, do not store it.
+ * @param out The collector for returning result values.
+ *
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+ * to fail and may trigger recovery.
+ */
+ void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception ;
+
+ /**
+ * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
+ * or {@link #onTimer(long, OnTimerContext, Collector)}.
+ */
+ interface Context {
+
+ /**
+ * Timestamp of the element currently being processed or timestamp of a firing timer.
+ *
+ * <p>This might be {@code null}, for example if the time characteristic of your program
+ * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+ */
+ Long timestamp();
+
+ /**
+ * A {@link TimerService} for querying time and registering timers.
+ */
+ TimerService timerService();
+ }
+
+ /**
+ * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
+ */
+ interface OnTimerContext extends Context {
+ /**
+ * The {@link TimeDomain} of the firing timer.
+ */
+ TimeDomain timeDomain();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
new file mode 100644
index 0000000..834f717
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+/**
+ * Rich variant of the {@link ProcessFunction}. As a
+ * {@link org.apache.flink.api.common.functions.RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+@PublicEvolving
+public abstract class RichProcessFunction<I, O>
+ extends AbstractRichFunction
+ implements ProcessFunction<I, O> {
+
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
deleted file mode 100644
index 0d86da9..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Rich variant of the {@link TimelyFlatMapFunction}. As a
- * {@link org.apache.flink.api.common.functions.RichFunction}, it gives access to the
- * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
- * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
- * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
- *
- * @param <I> Type of the input elements.
- * @param <O> Type of the returned elements.
- */
-@PublicEvolving
-public abstract class RichTimelyFlatMapFunction<I, O>
- extends AbstractRichFunction
- implements TimelyFlatMapFunction<I, O> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
deleted file mode 100644
index 5f039c4..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * Base interface for timely flatMap functions. FlatMap functions take elements and transform them,
- * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
- * and arrays.
- *
- * <p>A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal
- * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react
- * to them firing.
- *
- * <pre>{@code
- * DataStream<X> input = ...;
- *
- * DataStream<Y> result = input.flatMap(new MyTimelyFlatMapFunction());
- * }</pre>
- *
- * @param <I> Type of the input elements.
- * @param <O> Type of the returned elements.
- */
-@PublicEvolving
-public interface TimelyFlatMapFunction<I, O> extends Function, Serializable {
-
- /**
- * The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms
- * it into zero, one, or more elements.
- *
- * @param value The input value.
- * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
- * a {@link TimerService} for registering timers and querying the time. The
- * context is only valid during the invocation of this method, do not store it.
- * @param out The collector for returning result values.
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
- void flatMap(I value, Context ctx, Collector<O> out) throws Exception;
-
- /**
- * Called when a timer set using {@link TimerService} fires.
- *
- * @param timestamp The timestamp of the firing timer.
- * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
- * querying the {@link TimeDomain} of the firing timer and getting a
- * {@link TimerService} for registering timers and querying the time.
- * The context is only valid during the invocation of this method, do not store it.
- * @param out The collector for returning result values.
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
- void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception ;
-
- /**
- * Information available in an invocation of {@link #flatMap(Object, Context, Collector)}
- * or {@link #onTimer(long, OnTimerContext, Collector)}.
- */
- interface Context {
-
- /**
- * Timestamp of the element currently being processed or timestamp of a firing timer.
- *
- * <p>This might be {@code null}, for example if the time characteristic of your program
- * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
- */
- Long timestamp();
-
- /**
- * A {@link TimerService} for querying time and registering timers.
- */
- TimerService timerService();
- }
-
- /**
- * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
- */
- interface OnTimerContext extends Context {
- /**
- * The {@link TimeDomain} of the firing timer.
- */
- TimeDomain timeDomain();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
new file mode 100644
index 0000000..feff8fb
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * A function that processes elements of two streams and produces a single output one.
+ *
+ * <p>The function will be called for every element in the input streams and can produce
+ * zero or more output elements. Contrary to the {@link CoFlatMapFunction}, this function can also
+ * query the time (both event and processing) and set timers, through the provided {@link Context}.
+ * When reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * <p>An example use-case for connected streams would be the application of a set of rules that change
+ * over time ({@code stream A}) to the elements contained in another stream (stream {@code B}). The rules
+ * contained in {@code stream A} can be stored in the state and wait for new elements to arrive on
+ * {@code stream B}. Upon reception of a new element on {@code stream B}, the function can now apply the
+ * previously stored rules to the element and directly emit a result, and/or register a timer that
+ * will trigger an action in the future.
+ *
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Output type.
+ */
+@PublicEvolving
+public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+ /**
+ * This method is called for each element in the first of the connected streams.
+ *
+ * This function can output zero or more elements using the {@link Collector} parameter
+ * and also update internal state or set timers using the {@link Context} parameter.
+ *
+ * @param value The stream element
+ * @param ctx A {@link Context} that allows querying the timestamp of the element,
+ * querying the {@link TimeDomain} of the firing timer and getting a
+ * {@link TimerService} for registering timers and querying the time.
+ * The context is only valid during the invocation of this method, do not store it.
+ * @param out The collector to emit resulting elements to
+ * @throws Exception The function may throw exceptions which cause the streaming program
+ * to fail and go into recovery.
+ */
+ void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
+
+ /**
+ * This method is called for each element in the second of the connected streams.
+ *
+ * This function can output zero or more elements using the {@link Collector} parameter
+ * and also update internal state or set timers using the {@link Context} parameter.
+ *
+ * @param value The stream element
+ * @param ctx A {@link Context} that allows querying the timestamp of the element,
+ * querying the {@link TimeDomain} of the firing timer and getting a
+ * {@link TimerService} for registering timers and querying the time.
+ * The context is only valid during the invocation of this method, do not store it.
+ * @param out The collector to emit resulting elements to
+ * @throws Exception The function may throw exceptions which cause the streaming program
+ * to fail and go into recovery.
+ */
+ void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
+
+ /**
+ * Called when a timer set using {@link TimerService} fires.
+ *
+ * @param timestamp The timestamp of the firing timer.
+ * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
+ * querying the {@link TimeDomain} of the firing timer and getting a
+ * {@link TimerService} for registering timers and querying the time.
+ * The context is only valid during the invocation of this method, do not store it.
+ * @param out The collector for returning result values.
+ *
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+ * to fail and may trigger recovery.
+ */
+ void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception ;
+
+ /**
+ * Information available in an invocation of {@link #processElement1(Object, Context, Collector)}/
+ * {@link #processElement2(Object, Context, Collector)}
+ * or {@link #onTimer(long, OnTimerContext, Collector)}.
+ */
+ interface Context {
+
+ /**
+ * Timestamp of the element currently being processed or timestamp of a firing timer.
+ *
+ * <p>This might be {@code null}, for example if the time characteristic of your program
+ * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+ */
+ Long timestamp();
+
+ /**
+ * A {@link TimerService} for querying time and registering timers.
+ */
+ TimerService timerService();
+ }
+
+ /**
+ * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
+ */
+ interface OnTimerContext extends Context {
+ /**
+ * The {@link TimeDomain} of the firing timer.
+ */
+ TimeDomain timeDomain();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
new file mode 100644
index 0000000..0fcea91
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link CoProcessFunction}. As a {@link RichFunction}, it gives
+ * access to the {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
+ * setup and teardown methods: {@link RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link RichFunction#close()}.
+ *
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Type of the returned elements.
+ */
+@PublicEvolving
+public abstract class RichCoProcessFunction<IN1, IN2, OUT>
+ extends AbstractRichFunction
+ implements CoProcessFunction<IN1, IN2, OUT> {
+
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
deleted file mode 100644
index 12fe181..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * Rich variant of the {@link TimelyCoFlatMapFunction}. As a {@link RichFunction}, it gives
- * access to the {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
- * setup and teardown methods: {@link RichFunction#open(org.apache.flink.configuration.Configuration)}
- * and {@link RichFunction#close()}.
- *
- * @param <IN1> Type of the first input.
- * @param <IN2> Type of the second input.
- * @param <OUT> Type of the returned elements.
- */
-@PublicEvolving
-public abstract class RichTimelyCoFlatMapFunction<IN1, IN2, OUT>
- extends AbstractRichFunction
- implements TimelyCoFlatMapFunction<IN1, IN2, OUT> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
deleted file mode 100644
index 89c7d79..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * A {@code TimelyCoFlatMapFunction} implements a flat-map transformation over two
- * connected streams.
- *
- * <p>The same instance of the transformation function is used to transform
- * both of the connected streams. That way, the stream transformations can
- * share state.
- *
- * <p>A {@code TimelyCoFlatMapFunction} can, in addition to the functionality of a normal
- * {@link CoFlatMapFunction}, also set timers and react to them firing.
- *
- * <p>An example for the use of connected streams would be to apply rules that change over time
- * onto elements of a stream. One of the connected streams has the rules, the other stream the
- * elements to apply the rules to. The operation on the connected stream maintains the
- * current set of rules in the state. It may receive either a rule update (from the first stream)
- * and update the state, or a data element (from the second stream) and apply the rules in the
- * state to the element. The result of applying the rules would be emitted.
- *
- * @param <IN1> Type of the first input.
- * @param <IN2> Type of the second input.
- * @param <OUT> Output type.
- */
-@PublicEvolving
-public interface TimelyCoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
-
- /**
- * This method is called for each element in the first of the connected streams.
- *
- * @param value The stream element
- * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the element,
- * querying the {@link TimeDomain} of the firing timer and getting a
- * {@link TimerService} for registering timers and querying the time.
- * The context is only valid during the invocation of this method, do not store it.
- * @param out The collector to emit resulting elements to
- * @throws Exception The function may throw exceptions which cause the streaming program
- * to fail and go into recovery.
- */
- void flatMap1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
-
- /**
- * This method is called for each element in the second of the connected streams.
- *
- * @param value The stream element
- * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the element,
- * querying the {@link TimeDomain} of the firing timer and getting a
- * {@link TimerService} for registering timers and querying the time.
- * The context is only valid during the invocation of this method, do not store it.
- * @param out The collector to emit resulting elements to
- * @throws Exception The function may throw exceptions which cause the streaming program
- * to fail and go into recovery.
- */
- void flatMap2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
-
- /**
- * Called when a timer set using {@link TimerService} fires.
- *
- * @param timestamp The timestamp of the firing timer.
- * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
- * querying the {@link TimeDomain} of the firing timer and getting a
- * {@link TimerService} for registering timers and querying the time.
- * The context is only valid during the invocation of this method, do not store it.
- * @param out The collector for returning result values.
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
- void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception ;
-
- /**
- * Information available in an invocation of {@link #flatMap1(Object, Context, Collector)}/
- * {@link #flatMap2(Object, Context, Collector)}
- * or {@link #onTimer(long, OnTimerContext, Collector)}.
- */
- interface Context {
-
- /**
- * Timestamp of the element currently being processed or timestamp of a firing timer.
- *
- * <p>This might be {@code null}, for example if the time characteristic of your program
- * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
- */
- Long timestamp();
-
- /**
- * A {@link TimerService} for querying time and registering timers.
- */
- TimerService timerService();
- }
-
- /**
- * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
- */
- interface OnTimerContext extends TimelyFlatMapFunction.Context {
- /**
- * The {@link TimeDomain} of the firing timer.
- */
- TimeDomain timeDomain();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
new file mode 100644
index 0000000..3b13360
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@Internal
+public class ProcessOperator<K, IN, OUT>
+ extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
+ implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient TimestampedCollector<OUT> collector;
+
+ private transient TimerService timerService;
+
+ private transient ContextImpl<IN> context;
+
+ private transient OnTimerContextImpl onTimerContext;
+
+ public ProcessOperator(ProcessFunction<IN, OUT> flatMapper) {
+ super(flatMapper);
+
+ chainingStrategy = ChainingStrategy.ALWAYS;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ collector = new TimestampedCollector<>(output);
+
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
+
+ this.timerService = new SimpleTimerService(internalTimerService);
+
+ context = new ContextImpl<>(timerService);
+ onTimerContext = new OnTimerContextImpl(timerService);
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+ onTimerContext.timer = timer;
+ userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+ onTimerContext.timeDomain = null;
+ onTimerContext.timer = null;
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+ onTimerContext.timer = timer;
+ userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+ onTimerContext.timeDomain = null;
+ onTimerContext.timer = null;
+ }
+
+ @Override
+ public void processElement(StreamRecord<IN> element) throws Exception {
+ collector.setTimestamp(element);
+ context.element = element;
+ userFunction.processElement(element.getValue(), context, collector);
+ context.element = null;
+ }
+
+ private static class ContextImpl<T> implements ProcessFunction.Context {
+
+ private final TimerService timerService;
+
+ private StreamRecord<T> element;
+
+ ContextImpl(TimerService timerService) {
+ this.timerService = checkNotNull(timerService);
+ }
+
+ @Override
+ public Long timestamp() {
+ checkState(element != null);
+
+ if (element.hasTimestamp()) {
+ return element.getTimestamp();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+ }
+
+ private static class OnTimerContextImpl implements ProcessFunction.OnTimerContext{
+
+ private final TimerService timerService;
+
+ private TimeDomain timeDomain;
+
+ private InternalTimer<?, VoidNamespace> timer;
+
+ OnTimerContextImpl(TimerService timerService) {
+ this.timerService = checkNotNull(timerService);
+ }
+
+ @Override
+ public TimeDomain timeDomain() {
+ checkState(timeDomain != null);
+ return timeDomain;
+ }
+
+ @Override
+ public Long timestamp() {
+ checkState(timer != null);
+ return timer.getTimestamp();
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
deleted file mode 100644
index bafc435..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.SimpleTimerService;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-@Internal
-public class StreamTimelyFlatMap<K, IN, OUT>
- extends AbstractUdfStreamOperator<OUT, TimelyFlatMapFunction<IN, OUT>>
- implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
-
- private static final long serialVersionUID = 1L;
-
- private transient TimestampedCollector<OUT> collector;
-
- private transient TimerService timerService;
-
- private transient ContextImpl<IN> context;
-
- private transient OnTimerContextImpl onTimerContext;
-
- public StreamTimelyFlatMap(TimelyFlatMapFunction<IN, OUT> flatMapper) {
- super(flatMapper);
-
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- collector = new TimestampedCollector<>(output);
-
- InternalTimerService<VoidNamespace> internalTimerService =
- getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
-
- this.timerService = new SimpleTimerService(internalTimerService);
-
- context = new ContextImpl<>(timerService);
- onTimerContext = new OnTimerContextImpl(timerService);
- }
-
- @Override
- public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
- onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
- onTimerContext.timer = timer;
- userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
- onTimerContext.timeDomain = null;
- onTimerContext.timer = null;
- }
-
- @Override
- public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
- onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
- onTimerContext.timer = timer;
- userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
- onTimerContext.timeDomain = null;
- onTimerContext.timer = null;
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- collector.setTimestamp(element);
- context.element = element;
- userFunction.flatMap(element.getValue(), context, collector);
- context.element = null;
- }
-
- private static class ContextImpl<T> implements TimelyFlatMapFunction.Context {
-
- private final TimerService timerService;
-
- private StreamRecord<T> element;
-
- ContextImpl(TimerService timerService) {
- this.timerService = checkNotNull(timerService);
- }
-
- @Override
- public Long timestamp() {
- checkState(element != null);
-
- if (element.hasTimestamp()) {
- return element.getTimestamp();
- } else {
- return null;
- }
- }
-
- @Override
- public TimerService timerService() {
- return timerService;
- }
- }
-
- private static class OnTimerContextImpl implements TimelyFlatMapFunction.OnTimerContext{
-
- private final TimerService timerService;
-
- private TimeDomain timeDomain;
-
- private InternalTimer<?, VoidNamespace> timer;
-
- OnTimerContextImpl(TimerService timerService) {
- this.timerService = checkNotNull(timerService);
- }
-
- @Override
- public TimeDomain timeDomain() {
- checkState(timeDomain != null);
- return timeDomain;
- }
-
- @Override
- public Long timestamp() {
- checkState(timer != null);
- return timer.getTimestamp();
- }
-
- @Override
- public TimerService timerService() {
- return timerService;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
new file mode 100644
index 0000000..e6c3d3f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@Internal
+public class CoProcessOperator<K, IN1, IN2, OUT>
+ extends AbstractUdfStreamOperator<OUT, CoProcessFunction<IN1, IN2, OUT>>
+ implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient TimestampedCollector<OUT> collector;
+
+ private transient TimerService timerService;
+
+ private transient ContextImpl context;
+
+ private transient OnTimerContextImpl onTimerContext;
+
+ public CoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
+ super(flatMapper);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ collector = new TimestampedCollector<>(output);
+
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
+
+ this.timerService = new SimpleTimerService(internalTimerService);
+
+ context = new ContextImpl(timerService);
+ onTimerContext = new OnTimerContextImpl(timerService);
+ }
+
+ @Override
+ public void processElement1(StreamRecord<IN1> element) throws Exception {
+ collector.setTimestamp(element);
+ context.element = element;
+ userFunction.processElement1(element.getValue(), context, collector);
+ context.element = null;
+ }
+
+ @Override
+ public void processElement2(StreamRecord<IN2> element) throws Exception {
+ collector.setTimestamp(element);
+ context.element = element;
+ userFunction.processElement2(element.getValue(), context, collector);
+ context.element = null;
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+ onTimerContext.timer = timer;
+ userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+ onTimerContext.timeDomain = null;
+ onTimerContext.timer = null;
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+ onTimerContext.timer = timer;
+ userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+ onTimerContext.timeDomain = null;
+ onTimerContext.timer = null;
+ }
+
+ protected TimestampedCollector<OUT> getCollector() {
+ return collector;
+ }
+
+ private static class ContextImpl implements CoProcessFunction.Context {
+
+ private final TimerService timerService;
+
+ private StreamRecord<?> element;
+
+ ContextImpl(TimerService timerService) {
+ this.timerService = checkNotNull(timerService);
+ }
+
+ @Override
+ public Long timestamp() {
+ checkState(element != null);
+
+ if (element.hasTimestamp()) {
+ return element.getTimestamp();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+ }
+
+ private static class OnTimerContextImpl implements CoProcessFunction.OnTimerContext {
+
+ private final TimerService timerService;
+
+ private TimeDomain timeDomain;
+
+ private InternalTimer<?, VoidNamespace> timer;
+
+ OnTimerContextImpl(TimerService timerService) {
+ this.timerService = checkNotNull(timerService);
+ }
+
+ @Override
+ public TimeDomain timeDomain() {
+ checkState(timeDomain != null);
+ return timeDomain;
+ }
+
+ @Override
+ public Long timestamp() {
+ checkState(timer != null);
+ return timer.getTimestamp();
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
deleted file mode 100644
index 75e4c14..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.SimpleTimerService;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.Triggerable;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-@Internal
-public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
- extends AbstractUdfStreamOperator<OUT, TimelyCoFlatMapFunction<IN1, IN2, OUT>>
- implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
-
- private static final long serialVersionUID = 1L;
-
- private transient TimestampedCollector<OUT> collector;
-
- private transient TimerService timerService;
-
- private transient ContextImpl context;
-
- private transient OnTimerContextImpl onTimerContext;
-
- public CoStreamTimelyFlatMap(TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
- super(flatMapper);
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- collector = new TimestampedCollector<>(output);
-
- InternalTimerService<VoidNamespace> internalTimerService =
- getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
-
- this.timerService = new SimpleTimerService(internalTimerService);
-
- context = new ContextImpl(timerService);
- onTimerContext = new OnTimerContextImpl(timerService);
- }
-
- @Override
- public void processElement1(StreamRecord<IN1> element) throws Exception {
- collector.setTimestamp(element);
- context.element = element;
- userFunction.flatMap1(element.getValue(), context, collector);
- context.element = null;
- }
-
- @Override
- public void processElement2(StreamRecord<IN2> element) throws Exception {
- collector.setTimestamp(element);
- context.element = element;
- userFunction.flatMap2(element.getValue(), context, collector);
- context.element = null;
- }
-
- @Override
- public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
- onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
- onTimerContext.timer = timer;
- userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
- onTimerContext.timeDomain = null;
- onTimerContext.timer = null;
- }
-
- @Override
- public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
- onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
- onTimerContext.timer = timer;
- userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
- onTimerContext.timeDomain = null;
- onTimerContext.timer = null;
- }
-
- protected TimestampedCollector<OUT> getCollector() {
- return collector;
- }
-
- private static class ContextImpl implements TimelyCoFlatMapFunction.Context {
-
- private final TimerService timerService;
-
- private StreamRecord<?> element;
-
- ContextImpl(TimerService timerService) {
- this.timerService = checkNotNull(timerService);
- }
-
- @Override
- public Long timestamp() {
- checkState(element != null);
-
- if (element.hasTimestamp()) {
- return element.getTimestamp();
- } else {
- return null;
- }
- }
-
- @Override
- public TimerService timerService() {
- return timerService;
- }
- }
-
- private static class OnTimerContextImpl implements TimelyCoFlatMapFunction.OnTimerContext {
-
- private final TimerService timerService;
-
- private TimeDomain timeDomain;
-
- private InternalTimer<?, VoidNamespace> timer;
-
- OnTimerContextImpl(TimerService timerService) {
- this.timerService = checkNotNull(timerService);
- }
-
- @Override
- public TimeDomain timeDomain() {
- checkState(timeDomain != null);
- return timeDomain;
- }
-
- @Override
- public Long timestamp() {
- checkState(timer != null);
- return timer.getTimestamp();
- }
-
- @Override
- public TimerService timerService() {
- return timerService;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 8f002ba..eaac6b8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -47,7 +47,7 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -547,18 +547,19 @@ public class DataStreamTest {
}
/**
- * Verify that a timely flat map call is correctly translated to an operator.
+ * Verify that a {@link KeyedStream#process(ProcessFunction)} call is correctly translated to
+ * an operator.
*/
@Test
- public void testTimelyFlatMapTranslation() {
+ public void testProcessTranslation() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> src = env.generateSequence(0, 0);
- TimelyFlatMapFunction<Long, Integer> timelyFlatMapFunction = new TimelyFlatMapFunction<Long, Integer>() {
+ ProcessFunction<Long, Integer> processFunction = new ProcessFunction<Long, Integer>() {
private static final long serialVersionUID = 1L;
@Override
- public void flatMap(
+ public void processElement(
Long value,
Context ctx,
Collector<Integer> out) throws Exception {
@@ -574,14 +575,14 @@ public class DataStreamTest {
}
};
- DataStream<Integer> flatMapped = src
+ DataStream<Integer> processed = src
.keyBy(new IdentityKeySelector<Long>())
- .flatMap(timelyFlatMapFunction);
+ .process(processFunction);
- flatMapped.addSink(new DiscardingSink<Integer>());
+ processed.addSink(new DiscardingSink<Integer>());
- assertEquals(timelyFlatMapFunction, getFunctionForDataStream(flatMapped));
- assertTrue(getOperatorForDataStream(flatMapped) instanceof StreamTimelyFlatMap);
+ assertEquals(processFunction, getFunctionForDataStream(processed));
+ assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator);
}
@Test