You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/28 17:45:11 UTC
[2/2] flink git commit: [FLINK-5026] Rename TimelyFlatMap to Process
[FLINK-5026] Rename TimelyFlatMap to Process
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/910f733f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/910f733f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/910f733f
Branch: refs/heads/master
Commit: 910f733f5ec52d2dd1e9dcc4ec6a4844cae2f2b4
Parents: 3a27f55
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Nov 11 10:57:25 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 28 18:38:29 2016 +0100
----------------------------------------------------------------------
.../api/datastream/ConnectedStreams.java | 62 ++-
.../streaming/api/datastream/KeyedStream.java | 55 +-
.../api/functions/ProcessFunction.java | 109 ++++
.../api/functions/RichProcessFunction.java | 40 ++
.../functions/RichTimelyFlatMapFunction.java | 40 --
.../api/functions/TimelyFlatMapFunction.java | 110 ----
.../api/functions/co/CoProcessFunction.java | 130 +++++
.../api/functions/co/RichCoProcessFunction.java | 41 ++
.../co/RichTimelyCoFlatMapFunction.java | 41 --
.../functions/co/TimelyCoFlatMapFunction.java | 128 -----
.../api/operators/ProcessOperator.java | 151 ++++++
.../api/operators/StreamTimelyFlatMap.java | 151 ------
.../api/operators/co/CoProcessOperator.java | 167 ++++++
.../api/operators/co/CoStreamTimelyFlatMap.java | 167 ------
.../flink/streaming/api/DataStreamTest.java | 23 +-
.../api/operators/ProcessOperatorTest.java | 404 ++++++++++++++
.../api/operators/TimelyFlatMapTest.java | 404 --------------
.../api/operators/co/CoProcessOperatorTest.java | 536 +++++++++++++++++++
.../api/operators/co/TimelyCoFlatMapTest.java | 536 -------------------
.../streaming/api/scala/ConnectedStreams.scala | 39 +-
.../flink/streaming/api/scala/KeyedStream.scala | 24 +-
.../streaming/api/scala/DataStreamTest.scala | 20 +-
22 files changed, 1701 insertions(+), 1677 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index dc763cb..96a08d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -27,11 +27,12 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-import org.apache.flink.streaming.api.operators.co.CoStreamTimelyFlatMap;
+import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import static java.util.Objects.requireNonNull;
@@ -234,64 +235,71 @@ public class ConnectedStreams<IN1, IN2> {
}
/**
- * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams,
+ * Applies the given {@link CoProcessFunction} on the connected input streams,
* thereby creating a transformed output stream.
*
- * <p>The function will be called for every element in the streams and can produce
- * zero or more output. The function can also query the time and set timers. When
- * reacting to the firing of set timers the function can emit yet more elements.
+ * <p>The function will be called for every element in the input streams and can produce zero or
+ * more output elements. Contrary to the {@link #flatMap(CoFlatMapFunction)} function, this
+ * function can also query the time and set timers. When reacting to the firing of set timers
+ * the function can directly emit elements and/or register yet more timers.
*
- * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+ * <p>A {@link RichCoProcessFunction}
* can be used to gain access to features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
- * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element
+ * @param coProcessFunction The {@link CoProcessFunction} that is called for each element
* in the stream.
*
- * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}.
+ * @param <R> The type of elements emitted by the {@code CoProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
- public <R> SingleOutputStreamOperator<R> flatMap(
- TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper) {
+ @PublicEvolving
+ public <R> SingleOutputStreamOperator<R> process(
+ CoProcessFunction<IN1, IN2, R> coProcessFunction) {
- TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
- TimelyCoFlatMapFunction.class, false, true, getType1(), getType2(),
+ TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coProcessFunction,
+ CoProcessFunction.class, false, true, getType1(), getType2(),
Utils.getCallLocationName(), true);
- return flatMap(coFlatMapper, outTypeInfo);
+ return process(coProcessFunction, outTypeInfo);
}
/**
- * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams,
+ * Applies the given {@link CoProcessFunction} on the connected input streams,
* thereby creating a transformed output stream.
*
- * <p>The function will be called for every element in the streams and can produce
- * zero or more output. The function can also query the time and set timers. When
- * reacting to the firing of set timers the function can emit yet more elements.
+ * <p>The function will be called for every element in the input streams and can produce zero
+ * or more output elements. Contrary to the {@link #flatMap(CoFlatMapFunction)} function,
+ * this function can also query the time and set timers. When reacting to the firing of set
+ * timers the function can directly emit elements and/or register yet more timers.
*
- * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+ * <p>A {@link RichCoProcessFunction}
* can be used to gain access to features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
- * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element
+ * @param coProcessFunction The {@link CoProcessFunction} that is called for each element
* in the stream.
*
- * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}.
+ * @param <R> The type of elements emitted by the {@code CoProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
@Internal
- public <R> SingleOutputStreamOperator<R> flatMap(
- TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper,
+ public <R> SingleOutputStreamOperator<R> process(
+ CoProcessFunction<IN1, IN2, R> coProcessFunction,
TypeInformation<R> outputType) {
- CoStreamTimelyFlatMap<Object, IN1, IN2, R> operator = new CoStreamTimelyFlatMap<>(
- inputStream1.clean(coFlatMapper));
+ if (!(inputStream1 instanceof KeyedStream) || !(inputStream2 instanceof KeyedStream)) {
+ throw new UnsupportedOperationException("A CoProcessFunction can only be applied" +
+ "when both input streams are keyed.");
+ }
- return transform("Co-Flat Map", outputType, operator);
- }
+ CoProcessOperator<Object, IN1, IN2, R> operator = new CoProcessOperator<>(
+ inputStream1.clean(coProcessFunction));
+ return transform("Co-Process", outputType, operator);
+ }
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(String functionName,
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 5b00bcd..560ecab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -32,7 +33,8 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.RichProcessFunction;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
@@ -42,7 +44,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
@@ -173,67 +175,70 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
}
/**
- * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby
+ * Applies the given {@link ProcessFunction} on the input stream, thereby
* creating a transformed output stream.
*
- * <p>The function will be called for every element in the stream and can produce
- * zero or more output. The function can also query the time and set timers. When
- * reacting to the firing of set timers the function can emit yet more elements.
+ * <p>The function will be called for every element in the input streams and can produce zero
+ * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
+ * function, this function can also query the time and set timers. When reacting to the firing
+ * of set timers the function can directly emit elements and/or register yet more timers.
*
- * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+ * <p>A {@link RichProcessFunction}
* can be used to gain access to features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
- * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element
+ * @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
*
- * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}.
+ * @param <R> The type of elements emitted by the {@code ProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
- public <R> SingleOutputStreamOperator<R> flatMap(TimelyFlatMapFunction<T, R> flatMapper) {
+ @PublicEvolving
+ public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
- flatMapper,
- TimelyFlatMapFunction.class,
+ processFunction,
+ ProcessFunction.class,
false,
true,
getType(),
Utils.getCallLocationName(),
true);
- return flatMap(flatMapper, outType);
+ return process(processFunction, outType);
}
/**
- * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby
+ * Applies the given {@link ProcessFunction} on the input stream, thereby
* creating a transformed output stream.
*
- * <p>The function will be called for every element in the stream and can produce
- * zero or more output. The function can also query the time and set timers. When
- * reacting to the firing of set timers the function can emit yet more elements.
+ * <p>The function will be called for every element in the input streams and can produce zero
+ * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
+ * function, this function can also query the time and set timers. When reacting to the firing
+ * of set timers the function can directly emit elements and/or register yet more timers.
*
- * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+ * <p>A {@link RichProcessFunction}
* can be used to gain access to features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
- * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element
+ * @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
* @param outputType {@link TypeInformation} for the result type of the function.
*
- * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}.
+ * @param <R> The type of elements emitted by the {@code ProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
@Internal
- public <R> SingleOutputStreamOperator<R> flatMap(
- TimelyFlatMapFunction<T, R> flatMapper,
+ public <R> SingleOutputStreamOperator<R> process(
+ ProcessFunction<T, R> processFunction,
TypeInformation<R> outputType) {
- StreamTimelyFlatMap<KEY, T, R> operator =
- new StreamTimelyFlatMap<>(clean(flatMapper));
+ ProcessOperator<KEY, T, R> operator =
+ new ProcessOperator<>(clean(processFunction));
- return transform("Flat Map", outputType, operator);
+ return transform("Process", outputType, operator);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
new file mode 100644
index 0000000..fd0a829
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+/**
+ * A function that processes elements of a stream.
+ *
+ * <p>The function will be called for every element in the input stream and can produce
+ * zero or more output. The function can also query the time and set timers. When
+ * reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * <p>The function will be called for every element in the input stream and can produce
+ * zero or more output elements. Contrary to the
+ * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query
+ * the time (both event and processing) and set timers, through the provided {@link Context}.
+ * When reacting to the firing of set timers the function can directly emit a result, and/or
+ * register a timer that will trigger an action in the future.
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the output elements.
+ */
+@PublicEvolving
+public interface ProcessFunction<I, O> extends Function {
+
+ /**
+ * Process one element from the input stream.
+ *
+ * <p>This function can output zero or more elements using the {@link Collector} parameter
+ * and also update internal state or set timers using the {@link Context} parameter.
+ *
+ * @param value The input value.
+ * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
+ * a {@link TimerService} for registering timers and querying the time. The
+ * context is only valid during the invocation of this method, do not store it.
+ * @param out The collector for returning result values.
+ *
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+ * to fail and may trigger recovery.
+ */
+ void processElement(I value, Context ctx, Collector<O> out) throws Exception;
+
+ /**
+ * Called when a timer set using {@link TimerService} fires.
+ *
+ * @param timestamp The timestamp of the firing timer.
+ * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
+ * querying the {@link TimeDomain} of the firing timer and getting a
+ * {@link TimerService} for registering timers and querying the time.
+ * The context is only valid during the invocation of this method, do not store it.
+ * @param out The collector for returning result values.
+ *
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+ * to fail and may trigger recovery.
+ */
+ void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception ;
+
+ /**
+ * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
+ * or {@link #onTimer(long, OnTimerContext, Collector)}.
+ */
+ interface Context {
+
+ /**
+ * Timestamp of the element currently being processed or timestamp of a firing timer.
+ *
+ * <p>This might be {@code null}, for example if the time characteristic of your program
+ * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+ */
+ Long timestamp();
+
+ /**
+ * A {@link TimerService} for querying time and registering timers.
+ */
+ TimerService timerService();
+ }
+
+ /**
+ * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
+ */
+ interface OnTimerContext extends Context {
+ /**
+ * The {@link TimeDomain} of the firing timer.
+ */
+ TimeDomain timeDomain();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
new file mode 100644
index 0000000..834f717
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+/**
+ * Rich variant of the {@link ProcessFunction}. As a
+ * {@link org.apache.flink.api.common.functions.RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+@PublicEvolving
+public abstract class RichProcessFunction<I, O>
+ extends AbstractRichFunction
+ implements ProcessFunction<I, O> {
+
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
deleted file mode 100644
index 0d86da9..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Rich variant of the {@link TimelyFlatMapFunction}. As a
- * {@link org.apache.flink.api.common.functions.RichFunction}, it gives access to the
- * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
- * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
- * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
- *
- * @param <I> Type of the input elements.
- * @param <O> Type of the returned elements.
- */
-@PublicEvolving
-public abstract class RichTimelyFlatMapFunction<I, O>
- extends AbstractRichFunction
- implements TimelyFlatMapFunction<I, O> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
deleted file mode 100644
index 5f039c4..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * Base interface for timely flatMap functions. FlatMap functions take elements and transform them,
- * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
- * and arrays.
- *
- * <p>A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal
- * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react
- * to them firing.
- *
- * <pre>{@code
- * DataStream<X> input = ...;
- *
- * DataStream<Y> result = input.flatMap(new MyTimelyFlatMapFunction());
- * }</pre>
- *
- * @param <I> Type of the input elements.
- * @param <O> Type of the returned elements.
- */
-@PublicEvolving
-public interface TimelyFlatMapFunction<I, O> extends Function, Serializable {
-
- /**
- * The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms
- * it into zero, one, or more elements.
- *
- * @param value The input value.
- * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
- * a {@link TimerService} for registering timers and querying the time. The
- * context is only valid during the invocation of this method, do not store it.
- * @param out The collector for returning result values.
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
- void flatMap(I value, Context ctx, Collector<O> out) throws Exception;
-
- /**
- * Called when a timer set using {@link TimerService} fires.
- *
- * @param timestamp The timestamp of the firing timer.
- * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
- * querying the {@link TimeDomain} of the firing timer and getting a
- * {@link TimerService} for registering timers and querying the time.
- * The context is only valid during the invocation of this method, do not store it.
- * @param out The collector for returning result values.
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
- void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception ;
-
- /**
- * Information available in an invocation of {@link #flatMap(Object, Context, Collector)}
- * or {@link #onTimer(long, OnTimerContext, Collector)}.
- */
- interface Context {
-
- /**
- * Timestamp of the element currently being processed or timestamp of a firing timer.
- *
- * <p>This might be {@code null}, for example if the time characteristic of your program
- * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
- */
- Long timestamp();
-
- /**
- * A {@link TimerService} for querying time and registering timers.
- */
- TimerService timerService();
- }
-
- /**
- * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
- */
- interface OnTimerContext extends Context {
- /**
- * The {@link TimeDomain} of the firing timer.
- */
- TimeDomain timeDomain();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
new file mode 100644
index 0000000..feff8fb
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * A function that processes elements of two streams and produces a single output one.
+ *
+ * <p>The function will be called for every element in the input streams and can produce
+ * zero or more output elements. Contrary to the {@link CoFlatMapFunction}, this function can also
+ * query the time (both event and processing) and set timers, through the provided {@link Context}.
+ * When reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * <p>An example use-case for connected streams would be the application of a set of rules that change
+ * over time ({@code stream A}) to the elements contained in another stream (stream {@code B}). The rules
+ * contained in {@code stream A} can be stored in the state and wait for new elements to arrive on
+ * {@code stream B}. Upon reception of a new element on {@code stream B}, the function can now apply the
+ * previously stored rules to the element and directly emit a result, and/or register a timer that
+ * will trigger an action in the future.
+ *
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Output type.
+ */
+@PublicEvolving
+public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+ /**
+ * This method is called for each element in the first of the connected streams.
+ *
+ * This function can output zero or more elements using the {@link Collector} parameter
+ * and also update internal state or set timers using the {@link Context} parameter.
+ *
+ * @param value The stream element
+ * @param ctx A {@link Context} that allows querying the timestamp of the element,
+ * querying the {@link TimeDomain} of the firing timer and getting a
+ * {@link TimerService} for registering timers and querying the time.
+ * The context is only valid during the invocation of this method, do not store it.
+ * @param out The collector to emit resulting elements to
+ * @throws Exception The function may throw exceptions which cause the streaming program
+ * to fail and go into recovery.
+ */
+ void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
+
+ /**
+ * This method is called for each element in the second of the connected streams.
+ *
+ * This function can output zero or more elements using the {@link Collector} parameter
+ * and also update internal state or set timers using the {@link Context} parameter.
+ *
+ * @param value The stream element
+ * @param ctx A {@link Context} that allows querying the timestamp of the element,
+ * querying the {@link TimeDomain} of the firing timer and getting a
+ * {@link TimerService} for registering timers and querying the time.
+ * The context is only valid during the invocation of this method, do not store it.
+ * @param out The collector to emit resulting elements to
+ * @throws Exception The function may throw exceptions which cause the streaming program
+ * to fail and go into recovery.
+ */
+ void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
+
+ /**
+ * Called when a timer set using {@link TimerService} fires.
+ *
+ * @param timestamp The timestamp of the firing timer.
+ * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
+ * querying the {@link TimeDomain} of the firing timer and getting a
+ * {@link TimerService} for registering timers and querying the time.
+ * The context is only valid during the invocation of this method, do not store it.
+ * @param out The collector for returning result values.
+ *
+ * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+ * to fail and may trigger recovery.
+ */
+ void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception ;
+
+ /**
+ * Information available in an invocation of {@link #processElement1(Object, Context, Collector)}/
+ * {@link #processElement2(Object, Context, Collector)}
+ * or {@link #onTimer(long, OnTimerContext, Collector)}.
+ */
+ interface Context {
+
+ /**
+ * Timestamp of the element currently being processed or timestamp of a firing timer.
+ *
+ * <p>This might be {@code null}, for example if the time characteristic of your program
+ * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+ */
+ Long timestamp();
+
+ /**
+ * A {@link TimerService} for querying time and registering timers.
+ */
+ TimerService timerService();
+ }
+
+ /**
+ * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
+ */
+ interface OnTimerContext extends Context {
+ /**
+ * The {@link TimeDomain} of the firing timer.
+ */
+ TimeDomain timeDomain();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
new file mode 100644
index 0000000..0fcea91
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link CoProcessFunction}. As a {@link RichFunction}, it gives
+ * access to the {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
+ * setup and teardown methods: {@link RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link RichFunction#close()}.
+ *
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Type of the returned elements.
+ */
+@PublicEvolving
+public abstract class RichCoProcessFunction<IN1, IN2, OUT>
+ extends AbstractRichFunction
+ implements CoProcessFunction<IN1, IN2, OUT> {
+
+ private static final long serialVersionUID = 1L;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
deleted file mode 100644
index 12fe181..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * Rich variant of the {@link TimelyCoFlatMapFunction}. As a {@link RichFunction}, it gives
- * access to the {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
- * setup and teardown methods: {@link RichFunction#open(org.apache.flink.configuration.Configuration)}
- * and {@link RichFunction#close()}.
- *
- * @param <IN1> Type of the first input.
- * @param <IN2> Type of the second input.
- * @param <OUT> Type of the returned elements.
- */
-@PublicEvolving
-public abstract class RichTimelyCoFlatMapFunction<IN1, IN2, OUT>
- extends AbstractRichFunction
- implements TimelyCoFlatMapFunction<IN1, IN2, OUT> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
deleted file mode 100644
index 89c7d79..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * A {@code TimelyCoFlatMapFunction} implements a flat-map transformation over two
- * connected streams.
- *
- * <p>The same instance of the transformation function is used to transform
- * both of the connected streams. That way, the stream transformations can
- * share state.
- *
- * <p>A {@code TimelyCoFlatMapFunction} can, in addition to the functionality of a normal
- * {@link CoFlatMapFunction}, also set timers and react to them firing.
- *
- * <p>An example for the use of connected streams would be to apply rules that change over time
- * onto elements of a stream. One of the connected streams has the rules, the other stream the
- * elements to apply the rules to. The operation on the connected stream maintains the
- * current set of rules in the state. It may receive either a rule update (from the first stream)
- * and update the state, or a data element (from the second stream) and apply the rules in the
- * state to the element. The result of applying the rules would be emitted.
- *
- * @param <IN1> Type of the first input.
- * @param <IN2> Type of the second input.
- * @param <OUT> Output type.
- */
-@PublicEvolving
-public interface TimelyCoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
-
- /**
- * This method is called for each element in the first of the connected streams.
- *
- * @param value The stream element
- * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the element,
- * querying the {@link TimeDomain} of the firing timer and getting a
- * {@link TimerService} for registering timers and querying the time.
- * The context is only valid during the invocation of this method, do not store it.
- * @param out The collector to emit resulting elements to
- * @throws Exception The function may throw exceptions which cause the streaming program
- * to fail and go into recovery.
- */
- void flatMap1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
-
- /**
- * This method is called for each element in the second of the connected streams.
- *
- * @param value The stream element
- * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the element,
- * querying the {@link TimeDomain} of the firing timer and getting a
- * {@link TimerService} for registering timers and querying the time.
- * The context is only valid during the invocation of this method, do not store it.
- * @param out The collector to emit resulting elements to
- * @throws Exception The function may throw exceptions which cause the streaming program
- * to fail and go into recovery.
- */
- void flatMap2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
-
- /**
- * Called when a timer set using {@link TimerService} fires.
- *
- * @param timestamp The timestamp of the firing timer.
- * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
- * querying the {@link TimeDomain} of the firing timer and getting a
- * {@link TimerService} for registering timers and querying the time.
- * The context is only valid during the invocation of this method, do not store it.
- * @param out The collector for returning result values.
- *
- * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
- * to fail and may trigger recovery.
- */
- void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception ;
-
- /**
- * Information available in an invocation of {@link #flatMap1(Object, Context, Collector)}/
- * {@link #flatMap2(Object, Context, Collector)}
- * or {@link #onTimer(long, OnTimerContext, Collector)}.
- */
- interface Context {
-
- /**
- * Timestamp of the element currently being processed or timestamp of a firing timer.
- *
- * <p>This might be {@code null}, for example if the time characteristic of your program
- * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
- */
- Long timestamp();
-
- /**
- * A {@link TimerService} for querying time and registering timers.
- */
- TimerService timerService();
- }
-
- /**
- * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
- */
- interface OnTimerContext extends TimelyFlatMapFunction.Context {
- /**
- * The {@link TimeDomain} of the firing timer.
- */
- TimeDomain timeDomain();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
new file mode 100644
index 0000000..3b13360
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@Internal
+public class ProcessOperator<K, IN, OUT>
+ extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
+ implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient TimestampedCollector<OUT> collector;
+
+ private transient TimerService timerService;
+
+ private transient ContextImpl<IN> context;
+
+ private transient OnTimerContextImpl onTimerContext;
+
+ public ProcessOperator(ProcessFunction<IN, OUT> flatMapper) {
+ super(flatMapper);
+
+ chainingStrategy = ChainingStrategy.ALWAYS;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ collector = new TimestampedCollector<>(output);
+
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
+
+ this.timerService = new SimpleTimerService(internalTimerService);
+
+ context = new ContextImpl<>(timerService);
+ onTimerContext = new OnTimerContextImpl(timerService);
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+ onTimerContext.timer = timer;
+ userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+ onTimerContext.timeDomain = null;
+ onTimerContext.timer = null;
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+ onTimerContext.timer = timer;
+ userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+ onTimerContext.timeDomain = null;
+ onTimerContext.timer = null;
+ }
+
+ @Override
+ public void processElement(StreamRecord<IN> element) throws Exception {
+ collector.setTimestamp(element);
+ context.element = element;
+ userFunction.processElement(element.getValue(), context, collector);
+ context.element = null;
+ }
+
+ private static class ContextImpl<T> implements ProcessFunction.Context {
+
+ private final TimerService timerService;
+
+ private StreamRecord<T> element;
+
+ ContextImpl(TimerService timerService) {
+ this.timerService = checkNotNull(timerService);
+ }
+
+ @Override
+ public Long timestamp() {
+ checkState(element != null);
+
+ if (element.hasTimestamp()) {
+ return element.getTimestamp();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+ }
+
+ private static class OnTimerContextImpl implements ProcessFunction.OnTimerContext{
+
+ private final TimerService timerService;
+
+ private TimeDomain timeDomain;
+
+ private InternalTimer<?, VoidNamespace> timer;
+
+ OnTimerContextImpl(TimerService timerService) {
+ this.timerService = checkNotNull(timerService);
+ }
+
+ @Override
+ public TimeDomain timeDomain() {
+ checkState(timeDomain != null);
+ return timeDomain;
+ }
+
+ @Override
+ public Long timestamp() {
+ checkState(timer != null);
+ return timer.getTimestamp();
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
deleted file mode 100644
index bafc435..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.SimpleTimerService;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-@Internal
-public class StreamTimelyFlatMap<K, IN, OUT>
- extends AbstractUdfStreamOperator<OUT, TimelyFlatMapFunction<IN, OUT>>
- implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
-
- private static final long serialVersionUID = 1L;
-
- private transient TimestampedCollector<OUT> collector;
-
- private transient TimerService timerService;
-
- private transient ContextImpl<IN> context;
-
- private transient OnTimerContextImpl onTimerContext;
-
- public StreamTimelyFlatMap(TimelyFlatMapFunction<IN, OUT> flatMapper) {
- super(flatMapper);
-
- chainingStrategy = ChainingStrategy.ALWAYS;
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- collector = new TimestampedCollector<>(output);
-
- InternalTimerService<VoidNamespace> internalTimerService =
- getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
-
- this.timerService = new SimpleTimerService(internalTimerService);
-
- context = new ContextImpl<>(timerService);
- onTimerContext = new OnTimerContextImpl(timerService);
- }
-
- @Override
- public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
- onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
- onTimerContext.timer = timer;
- userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
- onTimerContext.timeDomain = null;
- onTimerContext.timer = null;
- }
-
- @Override
- public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
- onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
- onTimerContext.timer = timer;
- userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
- onTimerContext.timeDomain = null;
- onTimerContext.timer = null;
- }
-
- @Override
- public void processElement(StreamRecord<IN> element) throws Exception {
- collector.setTimestamp(element);
- context.element = element;
- userFunction.flatMap(element.getValue(), context, collector);
- context.element = null;
- }
-
- private static class ContextImpl<T> implements TimelyFlatMapFunction.Context {
-
- private final TimerService timerService;
-
- private StreamRecord<T> element;
-
- ContextImpl(TimerService timerService) {
- this.timerService = checkNotNull(timerService);
- }
-
- @Override
- public Long timestamp() {
- checkState(element != null);
-
- if (element.hasTimestamp()) {
- return element.getTimestamp();
- } else {
- return null;
- }
- }
-
- @Override
- public TimerService timerService() {
- return timerService;
- }
- }
-
- private static class OnTimerContextImpl implements TimelyFlatMapFunction.OnTimerContext{
-
- private final TimerService timerService;
-
- private TimeDomain timeDomain;
-
- private InternalTimer<?, VoidNamespace> timer;
-
- OnTimerContextImpl(TimerService timerService) {
- this.timerService = checkNotNull(timerService);
- }
-
- @Override
- public TimeDomain timeDomain() {
- checkState(timeDomain != null);
- return timeDomain;
- }
-
- @Override
- public Long timestamp() {
- checkState(timer != null);
- return timer.getTimestamp();
- }
-
- @Override
- public TimerService timerService() {
- return timerService;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
new file mode 100644
index 0000000..e6c3d3f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@Internal
+public class CoProcessOperator<K, IN1, IN2, OUT>
+ extends AbstractUdfStreamOperator<OUT, CoProcessFunction<IN1, IN2, OUT>>
+ implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient TimestampedCollector<OUT> collector;
+
+ private transient TimerService timerService;
+
+ private transient ContextImpl context;
+
+ private transient OnTimerContextImpl onTimerContext;
+
+ public CoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
+ super(flatMapper);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ collector = new TimestampedCollector<>(output);
+
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
+
+ this.timerService = new SimpleTimerService(internalTimerService);
+
+ context = new ContextImpl(timerService);
+ onTimerContext = new OnTimerContextImpl(timerService);
+ }
+
+ @Override
+ public void processElement1(StreamRecord<IN1> element) throws Exception {
+ collector.setTimestamp(element);
+ context.element = element;
+ userFunction.processElement1(element.getValue(), context, collector);
+ context.element = null;
+ }
+
+ @Override
+ public void processElement2(StreamRecord<IN2> element) throws Exception {
+ collector.setTimestamp(element);
+ context.element = element;
+ userFunction.processElement2(element.getValue(), context, collector);
+ context.element = null;
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+ onTimerContext.timer = timer;
+ userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+ onTimerContext.timeDomain = null;
+ onTimerContext.timer = null;
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+ onTimerContext.timer = timer;
+ userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+ onTimerContext.timeDomain = null;
+ onTimerContext.timer = null;
+ }
+
+ protected TimestampedCollector<OUT> getCollector() {
+ return collector;
+ }
+
+ private static class ContextImpl implements CoProcessFunction.Context {
+
+ private final TimerService timerService;
+
+ private StreamRecord<?> element;
+
+ ContextImpl(TimerService timerService) {
+ this.timerService = checkNotNull(timerService);
+ }
+
+ @Override
+ public Long timestamp() {
+ checkState(element != null);
+
+ if (element.hasTimestamp()) {
+ return element.getTimestamp();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+ }
+
+ private static class OnTimerContextImpl implements CoProcessFunction.OnTimerContext {
+
+ private final TimerService timerService;
+
+ private TimeDomain timeDomain;
+
+ private InternalTimer<?, VoidNamespace> timer;
+
+ OnTimerContextImpl(TimerService timerService) {
+ this.timerService = checkNotNull(timerService);
+ }
+
+ @Override
+ public TimeDomain timeDomain() {
+ checkState(timeDomain != null);
+ return timeDomain;
+ }
+
+ @Override
+ public Long timestamp() {
+ checkState(timer != null);
+ return timer.getTimestamp();
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
deleted file mode 100644
index 75e4c14..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.SimpleTimerService;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.Triggerable;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-@Internal
-public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
- extends AbstractUdfStreamOperator<OUT, TimelyCoFlatMapFunction<IN1, IN2, OUT>>
- implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
-
- private static final long serialVersionUID = 1L;
-
- private transient TimestampedCollector<OUT> collector;
-
- private transient TimerService timerService;
-
- private transient ContextImpl context;
-
- private transient OnTimerContextImpl onTimerContext;
-
- public CoStreamTimelyFlatMap(TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
- super(flatMapper);
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- collector = new TimestampedCollector<>(output);
-
- InternalTimerService<VoidNamespace> internalTimerService =
- getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
-
- this.timerService = new SimpleTimerService(internalTimerService);
-
- context = new ContextImpl(timerService);
- onTimerContext = new OnTimerContextImpl(timerService);
- }
-
- @Override
- public void processElement1(StreamRecord<IN1> element) throws Exception {
- collector.setTimestamp(element);
- context.element = element;
- userFunction.flatMap1(element.getValue(), context, collector);
- context.element = null;
- }
-
- @Override
- public void processElement2(StreamRecord<IN2> element) throws Exception {
- collector.setTimestamp(element);
- context.element = element;
- userFunction.flatMap2(element.getValue(), context, collector);
- context.element = null;
- }
-
- @Override
- public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
- onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
- onTimerContext.timer = timer;
- userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
- onTimerContext.timeDomain = null;
- onTimerContext.timer = null;
- }
-
- @Override
- public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
- onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
- onTimerContext.timer = timer;
- userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
- onTimerContext.timeDomain = null;
- onTimerContext.timer = null;
- }
-
- protected TimestampedCollector<OUT> getCollector() {
- return collector;
- }
-
- private static class ContextImpl implements TimelyCoFlatMapFunction.Context {
-
- private final TimerService timerService;
-
- private StreamRecord<?> element;
-
- ContextImpl(TimerService timerService) {
- this.timerService = checkNotNull(timerService);
- }
-
- @Override
- public Long timestamp() {
- checkState(element != null);
-
- if (element.hasTimestamp()) {
- return element.getTimestamp();
- } else {
- return null;
- }
- }
-
- @Override
- public TimerService timerService() {
- return timerService;
- }
- }
-
- private static class OnTimerContextImpl implements TimelyCoFlatMapFunction.OnTimerContext {
-
- private final TimerService timerService;
-
- private TimeDomain timeDomain;
-
- private InternalTimer<?, VoidNamespace> timer;
-
- OnTimerContextImpl(TimerService timerService) {
- this.timerService = checkNotNull(timerService);
- }
-
- @Override
- public TimeDomain timeDomain() {
- checkState(timeDomain != null);
- return timeDomain;
- }
-
- @Override
- public Long timestamp() {
- checkState(timer != null);
- return timer.getTimestamp();
- }
-
- @Override
- public TimerService timerService() {
- return timerService;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 8f002ba..eaac6b8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -47,7 +47,7 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -547,18 +547,19 @@ public class DataStreamTest {
}
/**
- * Verify that a timely flat map call is correctly translated to an operator.
+ * Verify that a {@link KeyedStream#process(ProcessFunction)} call is correctly translated to
+ * an operator.
*/
@Test
- public void testTimelyFlatMapTranslation() {
+ public void testProcessTranslation() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> src = env.generateSequence(0, 0);
- TimelyFlatMapFunction<Long, Integer> timelyFlatMapFunction = new TimelyFlatMapFunction<Long, Integer>() {
+ ProcessFunction<Long, Integer> processFunction = new ProcessFunction<Long, Integer>() {
private static final long serialVersionUID = 1L;
@Override
- public void flatMap(
+ public void processElement(
Long value,
Context ctx,
Collector<Integer> out) throws Exception {
@@ -574,14 +575,14 @@ public class DataStreamTest {
}
};
- DataStream<Integer> flatMapped = src
+ DataStream<Integer> processed = src
.keyBy(new IdentityKeySelector<Long>())
- .flatMap(timelyFlatMapFunction);
+ .process(processFunction);
- flatMapped.addSink(new DiscardingSink<Integer>());
+ processed.addSink(new DiscardingSink<Integer>());
- assertEquals(timelyFlatMapFunction, getFunctionForDataStream(flatMapped));
- assertTrue(getOperatorForDataStream(flatMapped) instanceof StreamTimelyFlatMap);
+ assertEquals(processFunction, getFunctionForDataStream(processed));
+ assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator);
}
@Test