You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/06 15:52:09 UTC
[1/5] flink git commit: [FLINK-4460] Update doc: ProcessFunction now
possible on DataStream
Repository: flink
Updated Branches:
refs/heads/master 62517ca4b -> 746c1efa5
[FLINK-4460] Update doc: ProcessFunction now possible on DataStream
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/746c1efa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/746c1efa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/746c1efa
Branch: refs/heads/master
Commit: 746c1efa504267e18ddc6305712d4c3717ba4cb4
Parents: 06740fb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Mar 6 12:24:46 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 6 12:26:16 2017 +0100
----------------------------------------------------------------------
docs/dev/stream/process_function.md | 33 +++++++++++++++++---------------
1 file changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/746c1efa/docs/dev/stream/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/process_function.md b/docs/dev/stream/process_function.md
index 22295be..1f93f68 100644
--- a/docs/dev/stream/process_function.md
+++ b/docs/dev/stream/process_function.md
@@ -32,32 +32,35 @@ The `ProcessFunction` is a low-level stream processing operation, giving access
all (acyclic) streaming applications:
- events (stream elements)
- - state (fault tolerant, consistent)
- - timers (event time and processing time)
+ - state (fault tolerant, consistent, only on keyed stream)
+ - timers (event time and processing time, only on keyed stream)
The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events
by being invoked for each event received in the input stream(s).
For fault tolerant state, the `ProcessFunction` gives access to Flink's [keyed state](state.html), accessible via the
-`RuntimeContext`, similar to the way other stateful functions can access keyed state. Like all functions with keyed state,
-the `ProcessFunction` needs to be applied onto a `KeyedStream`:
-```java
-stream.keyBy("id").process(new MyProcessFunction())
-```
+`RuntimeContext`, similar to the way other stateful functions can access keyed state.
The timers allow applications to react to changes in processing time and in [event time](../event_time.html).
Every call to the function `processElement(...)` gets a `Context` object with gives access to the element's
event time timestamp, and to the *TimerService*. The `TimerService` can be used to register callbacks for future
-event-/processing- time instants. When a timer's particular time is reached, the `onTimer(...)` method is
+event-/processing-time instants. When a timer's particular time is reached, the `onTimer(...)` method is
called. During that call, all states are again scoped to the key with which the timer was created, allowing
timers to perform keyed state manipulation as well.
+<span class="label label-info">Note</span> If you want to access keyed state and timers you have
+to apply the `ProcessFunction` on a keyed stream:
+
+{% highlight java %}
+stream.keyBy(...).process(new MyProcessFunction())
+{% endhighlight %}
+
## Low-level Joins
-To realize low-level operations on two inputs, applications can use `CoProcessFunction`. It relates to `ProcessFunction`
-in the same way that `CoFlatMapFunction` relates to `FlatMapFunction`: the function is bound to two different inputs and
-gets individual calls to `processElement1(...)` and `processElement2(...)` for records from the two different inputs.
+To realize low-level operations on two inputs, applications can use `CoProcessFunction`. This
+function is bound to two different inputs and gets individual calls to `processElement1(...)` and
+`processElement2(...)` for records from the two different inputs.
Implementing a low level join typically follows this pattern:
@@ -82,8 +85,8 @@ The following example maintains counts per key, and emits a key/count pair whene
- Upon each callback, it checks the callback's event time timestamp against the last-modification time of the stored count
and emits the key/count if they match (i.e., no further update occurred during that minute)
-*Note:* This simple example could have been implemented with session windows. We use `ProcessFunction` here to illustrate
-the basic pattern it provides.
+<span class="label label-info">Note</span> This simple example could have been implemented with
+session windows. We use `ProcessFunction` here to illustrate the basic pattern it provides.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -207,7 +210,7 @@ class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long
// initialize or retrieve/update the state
val current: CountWithTimestamp = state.value match {
- case null =>
+ case null =>
CountWithTimestamp(key, 1, ctx.timestamp)
case CountWithTimestamp(key, count, time) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
@@ -222,7 +225,7 @@ class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
state.value match {
- case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) =>
+ case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) =>
out.collect((key, count))
case _ =>
}
[3/5] flink git commit: [FLINK-4460] Make ProcessFunction abstract,
add default onTime() method
Posted by al...@apache.org.
[FLINK-4460] Make ProcessFunction abstract, add default onTime() method
This is in preparation of allowing ProcessFunction on DataStream because
we will use it to allow side outputs from the ProcessFunction Context.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82eddcad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82eddcad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82eddcad
Branch: refs/heads/master
Commit: 82eddcad9674b09e90db921bc874b43b64796420
Parents: 62517ca
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 1 10:57:12 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 6 12:26:16 2017 +0100
----------------------------------------------------------------------
.../streaming/api/datastream/KeyedStream.java | 9 ----
.../api/functions/ProcessFunction.java | 43 ++++++++++++--------
.../api/functions/RichProcessFunction.java | 40 ------------------
.../api/operators/ProcessOperator.java | 24 +++++------
.../api/operators/ProcessOperatorTest.java | 9 ++--
.../flink/streaming/api/scala/KeyedStream.scala | 6 +--
.../streaming/api/scala/DataStreamTest.scala | 8 ++--
7 files changed, 45 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/82eddcad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 7f33275..1fcd5bb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -32,7 +32,6 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.RichProcessFunction;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
@@ -181,10 +180,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
* function, this function can also query the time and set timers. When reacting to the firing
* of set timers the function can directly emit elements and/or register yet more timers.
*
- * <p>A {@link RichProcessFunction}
- * can be used to gain access to features provided by the
- * {@link org.apache.flink.api.common.functions.RichFunction} interface.
- *
* @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
*
@@ -216,10 +211,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
* function, this function can also query the time and set timers. When reacting to the firing
* of set timers the function can directly emit elements and/or register yet more timers.
*
- * <p>A {@link RichProcessFunction}
- * can be used to gain access to features provided by the
- * {@link org.apache.flink.api.common.functions.RichFunction} interface.
- *
* @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
* @param outputType {@link TypeInformation} for the result type of the function.
http://git-wip-us.apache.org/repos/asf/flink/blob/82eddcad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
index fd0a829..48418af 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.api.functions;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.util.Collector;
@@ -27,22 +27,29 @@ import org.apache.flink.util.Collector;
/**
* A function that processes elements of a stream.
*
- * <p>The function will be called for every element in the input stream and can produce
- * zero or more output. The function can also query the time and set timers. When
- * reacting to the firing of set timers the function can emit yet more elements.
+ * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. Implementations can also
+ * query the time and set timers through the provided {@link Context}. For firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce
+ * zero or more elements as output and register further timers.
*
- * <p>The function will be called for every element in the input stream and can produce
- * zero or more output elements. Contrary to the
- * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query
- * the time (both event and processing) and set timers, through the provided {@link Context}.
- * When reacting to the firing of set timers the function can directly emit a result, and/or
- * register a timer that will trigger an action in the future.
+ * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
+ * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}.
+ *
+ * <p><b>NOTE:</b> A {@code ProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and
+ * teardown methods can be implemented. See
+ * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
*
* @param <I> Type of the input elements.
* @param <O> Type of the output elements.
*/
@PublicEvolving
-public interface ProcessFunction<I, O> extends Function {
+public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
+
+ private static final long serialVersionUID = 1L;
/**
* Process one element from the input stream.
@@ -59,7 +66,7 @@ public interface ProcessFunction<I, O> extends Function {
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
- void processElement(I value, Context ctx, Collector<O> out) throws Exception;
+ public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
/**
* Called when a timer set using {@link TimerService} fires.
@@ -74,13 +81,13 @@ public interface ProcessFunction<I, O> extends Function {
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
- void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception ;
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
/**
* Information available in an invocation of {@link #processElement(Object, Context, Collector)}
* or {@link #onTimer(long, OnTimerContext, Collector)}.
*/
- interface Context {
+ public abstract class Context {
/**
* Timestamp of the element currently being processed or timestamp of a firing timer.
@@ -88,22 +95,22 @@ public interface ProcessFunction<I, O> extends Function {
* <p>This might be {@code null}, for example if the time characteristic of your program
* is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
*/
- Long timestamp();
+ public abstract Long timestamp();
/**
* A {@link TimerService} for querying time and registering timers.
*/
- TimerService timerService();
+ public abstract TimerService timerService();
}
/**
* Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
*/
- interface OnTimerContext extends Context {
+ public abstract class OnTimerContext extends Context {
/**
* The {@link TimeDomain} of the firing timer.
*/
- TimeDomain timeDomain();
+ public abstract TimeDomain timeDomain();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82eddcad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
deleted file mode 100644
index 834f717..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Rich variant of the {@link ProcessFunction}. As a
- * {@link org.apache.flink.api.common.functions.RichFunction}, it gives access to the
- * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
- * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
- * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
- *
- * @param <I> Type of the input elements.
- * @param <O> Type of the returned elements.
- */
-@PublicEvolving
-public abstract class RichProcessFunction<I, O>
- extends AbstractRichFunction
- implements ProcessFunction<I, O> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/82eddcad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
index 3b13360..7b3617c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -38,11 +38,9 @@ public class ProcessOperator<K, IN, OUT>
private transient TimestampedCollector<OUT> collector;
- private transient TimerService timerService;
+ private transient ContextImpl<IN, OUT> context;
- private transient ContextImpl<IN> context;
-
- private transient OnTimerContextImpl onTimerContext;
+ private transient OnTimerContextImpl<IN, OUT> onTimerContext;
public ProcessOperator(ProcessFunction<IN, OUT> flatMapper) {
super(flatMapper);
@@ -58,10 +56,10 @@ public class ProcessOperator<K, IN, OUT>
InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
- this.timerService = new SimpleTimerService(internalTimerService);
+ TimerService timerService = new SimpleTimerService(internalTimerService);
- context = new ContextImpl<>(timerService);
- onTimerContext = new OnTimerContextImpl(timerService);
+ context = new ContextImpl<>(userFunction, timerService);
+ onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
}
@Override
@@ -92,13 +90,14 @@ public class ProcessOperator<K, IN, OUT>
context.element = null;
}
- private static class ContextImpl<T> implements ProcessFunction.Context {
+ private static class ContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.Context {
private final TimerService timerService;
- private StreamRecord<T> element;
+ private StreamRecord<IN> element;
- ContextImpl(TimerService timerService) {
+ ContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
+ function.super();
this.timerService = checkNotNull(timerService);
}
@@ -119,7 +118,7 @@ public class ProcessOperator<K, IN, OUT>
}
}
- private static class OnTimerContextImpl implements ProcessFunction.OnTimerContext{
+ private static class OnTimerContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.OnTimerContext{
private final TimerService timerService;
@@ -127,7 +126,8 @@ public class ProcessOperator<K, IN, OUT>
private InternalTimer<?, VoidNamespace> timer;
- OnTimerContextImpl(TimerService timerService) {
+ OnTimerContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
+ function.super();
this.timerService = checkNotNull(timerService);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/82eddcad/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
index 89d9899..b946646 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.functions.RichProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -286,7 +285,7 @@ public class ProcessOperatorTest extends TestLogger {
}
}
- private static class QueryingFlatMapFunction implements ProcessFunction<Integer, String> {
+ private static class QueryingFlatMapFunction extends ProcessFunction<Integer, String> {
private static final long serialVersionUID = 1L;
@@ -313,7 +312,7 @@ public class ProcessOperatorTest extends TestLogger {
}
}
- private static class TriggeringFlatMapFunction implements ProcessFunction<Integer, Integer> {
+ private static class TriggeringFlatMapFunction extends ProcessFunction<Integer, Integer> {
private static final long serialVersionUID = 1L;
@@ -344,7 +343,7 @@ public class ProcessOperatorTest extends TestLogger {
}
}
- private static class TriggeringStatefulFlatMapFunction extends RichProcessFunction<Integer, String> {
+ private static class TriggeringStatefulFlatMapFunction extends ProcessFunction<Integer, String> {
private static final long serialVersionUID = 1L;
@@ -378,7 +377,7 @@ public class ProcessOperatorTest extends TestLogger {
}
}
- private static class BothTriggeringFlatMapFunction implements ProcessFunction<Integer, String> {
+ private static class BothTriggeringFlatMapFunction extends ProcessFunction<Integer, String> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/82eddcad/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 99936e7..489b571 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.{FoldingStateDescriptor, ReducingStateD
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.streaming.api.datastream.{QueryableStateStream, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
-import org.apache.flink.streaming.api.functions.{ProcessFunction, RichProcessFunction}
+import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator}
@@ -66,10 +66,6 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* function, this function can also query the time and set timers. When reacting to the firing
* of set timers the function can directly emit elements and/or register yet more timers.
*
- * A [[RichProcessFunction]]
- * can be used to gain access to features provided by the
- * [[org.apache.flink.api.common.functions.RichFunction]]
- *
* @param processFunction The [[ProcessFunction]] that is called for each element
* in the stream.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/82eddcad/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 841567a..b546577 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.operators.ResourceSpec
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, ProcessOperator, StreamOperator}
@@ -404,10 +403,9 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
val src = env.generateSequence(0, 0)
val processFunction = new ProcessFunction[Long, Int] {
- override def processElement(value: Long, ctx: Context, out: Collector[Int]): Unit = ???
- override def onTimer(
- timestamp: Long,
- ctx: OnTimerContext,
+ override def processElement(
+ value: Long,
+ ctx: ProcessFunction[Long, Int]#Context,
out: Collector[Int]): Unit = ???
}
[5/5] flink git commit: [FLINK-4660] Allow ProcessFunction on
DataStream
Posted by al...@apache.org.
[FLINK-4660] Allow ProcessFunction on DataStream
Introduce new ProcessOperator for this. Rename the pre-existing
ProcessOperator to KeyedProcessOperator.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0228676d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0228676d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0228676d
Branch: refs/heads/master
Commit: 0228676d689b04accf1ba7d7c7f8064e121c6d88
Parents: 82eddca
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 1 11:41:02 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 6 12:26:16 2017 +0100
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 56 +++
.../streaming/api/datastream/KeyedStream.java | 8 +-
.../api/operators/KeyedProcessOperator.java | 151 +++++++
.../api/operators/ProcessOperator.java | 95 ++---
.../flink/streaming/api/DataStreamTest.java | 45 ++-
.../api/operators/KeyedProcessOperatorTest.java | 403 +++++++++++++++++++
.../api/operators/ProcessOperatorTest.java | 298 +-------------
.../flink/streaming/api/scala/DataStream.scala | 24 +-
.../flink/streaming/api/scala/KeyedStream.scala | 2 +-
.../streaming/api/scala/DataStreamTest.scala | 29 +-
10 files changed, 747 insertions(+), 364 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c443758..8fcaf6b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -51,6 +51,7 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.TimestampExtractor;
@@ -59,6 +60,7 @@ import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.operators.StreamMap;
@@ -556,6 +558,60 @@ public class DataStream<T> {
}
/**
+ * Applies the given {@link ProcessFunction} on the input stream, thereby
+ * creating a transformed output stream.
+ *
+ * <p>The function will be called for every element in the input streams and can produce zero
+ * or more output elements.
+ *
+ * @param processFunction The {@link ProcessFunction} that is called for each element
+ * in the stream.
+ *
+ * @param <R> The type of elements emitted by the {@code ProcessFunction}.
+ *
+ * @return The transformed {@link DataStream}.
+ */
+ @PublicEvolving
+ public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
+
+ TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
+ processFunction,
+ ProcessFunction.class,
+ false,
+ true,
+ getType(),
+ Utils.getCallLocationName(),
+ true);
+
+ return process(processFunction, outType);
+ }
+
+ /**
+ * Applies the given {@link ProcessFunction} on the input stream, thereby
+ * creating a transformed output stream.
+ *
+ * <p>The function will be called for every element in the input streams and can produce zero
+ * or more output elements.
+ *
+ * @param processFunction The {@link ProcessFunction} that is called for each element
+ * in the stream.
+ * @param outputType {@link TypeInformation} for the result type of the function.
+ *
+ * @param <R> The type of elements emitted by the {@code ProcessFunction}.
+ *
+ * @return The transformed {@link DataStream}.
+ */
+ @Internal
+ public <R> SingleOutputStreamOperator<R> process(
+ ProcessFunction<T, R> processFunction,
+ TypeInformation<R> outputType) {
+
+ ProcessOperator<T, R> operator = new ProcessOperator<>(clean(processFunction));
+
+ return transform("Process", outputType, operator);
+ }
+
+ /**
* Applies a Filter transformation on a {@link DataStream}. The
* transformation calls a {@link FilterFunction} for each element of the
* DataStream and retains only those element for which the function returns
http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 1fcd5bb..7c9f5bc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -40,7 +40,7 @@ import org.apache.flink.streaming.api.functions.query.QueryableValueStateOperato
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -187,6 +187,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
*
* @return The transformed {@link DataStream}.
*/
+ @Override
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
@@ -219,13 +220,14 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
*
* @return The transformed {@link DataStream}.
*/
+ @Override
@Internal
public <R> SingleOutputStreamOperator<R> process(
ProcessFunction<T, R> processFunction,
TypeInformation<R> outputType) {
- ProcessOperator<KEY, T, R> operator =
- new ProcessOperator<>(clean(processFunction));
+ KeyedProcessOperator<KEY, T, R> operator =
+ new KeyedProcessOperator<>(clean(processFunction));
return transform("Process", outputType, operator);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
new file mode 100644
index 0000000..d8dfb0f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@Internal
+public class KeyedProcessOperator<K, IN, OUT>
+ extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
+ implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient TimestampedCollector<OUT> collector;
+
+ private transient ContextImpl<IN, OUT> context;
+
+ private transient OnTimerContextImpl<IN, OUT> onTimerContext;
+
+ public KeyedProcessOperator(ProcessFunction<IN, OUT> function) {
+ super(function);
+
+ chainingStrategy = ChainingStrategy.ALWAYS;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ collector = new TimestampedCollector<>(output);
+
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
+
+ TimerService timerService = new SimpleTimerService(internalTimerService);
+
+ context = new ContextImpl<>(userFunction, timerService);
+ onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+ onTimerContext.timer = timer;
+ userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+ onTimerContext.timeDomain = null;
+ onTimerContext.timer = null;
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+ onTimerContext.timer = timer;
+ userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+ onTimerContext.timeDomain = null;
+ onTimerContext.timer = null;
+ }
+
+ @Override
+ public void processElement(StreamRecord<IN> element) throws Exception {
+ collector.setTimestamp(element);
+ context.element = element;
+ userFunction.processElement(element.getValue(), context, collector);
+ context.element = null;
+ }
+
+ private static class ContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.Context {
+
+ private final TimerService timerService;
+
+ private StreamRecord<IN> element;
+
+ ContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
+ function.super();
+ this.timerService = checkNotNull(timerService);
+ }
+
+ @Override
+ public Long timestamp() {
+ checkState(element != null);
+
+ if (element.hasTimestamp()) {
+ return element.getTimestamp();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+ }
+
+ private static class OnTimerContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.OnTimerContext{
+
+ private final TimerService timerService;
+
+ private TimeDomain timeDomain;
+
+ private InternalTimer<?, VoidNamespace> timer;
+
+ OnTimerContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
+ function.super();
+ this.timerService = checkNotNull(timerService);
+ }
+
+ @Override
+ public TimeDomain timeDomain() {
+ checkState(timeDomain != null);
+ return timeDomain;
+ }
+
+ @Override
+ public Long timestamp() {
+ checkState(timer != null);
+ return timer.getTimestamp();
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
index 7b3617c..13b68f4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -18,32 +18,30 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.SimpleTimerService;
-import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@Internal
-public class ProcessOperator<K, IN, OUT>
+public class ProcessOperator<IN, OUT>
extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
- implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
+ implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
private transient TimestampedCollector<OUT> collector;
- private transient ContextImpl<IN, OUT> context;
+ private transient ContextImpl context;
- private transient OnTimerContextImpl<IN, OUT> onTimerContext;
+ /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
+ private long currentWatermark = Long.MIN_VALUE;
- public ProcessOperator(ProcessFunction<IN, OUT> flatMapper) {
- super(flatMapper);
+ public ProcessOperator(ProcessFunction<IN, OUT> function) {
+ super(function);
chainingStrategy = ChainingStrategy.ALWAYS;
}
@@ -53,33 +51,7 @@ public class ProcessOperator<K, IN, OUT>
super.open();
collector = new TimestampedCollector<>(output);
- InternalTimerService<VoidNamespace> internalTimerService =
- getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
-
- TimerService timerService = new SimpleTimerService(internalTimerService);
-
- context = new ContextImpl<>(userFunction, timerService);
- onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
- }
-
- @Override
- public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
- onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
- onTimerContext.timer = timer;
- userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
- onTimerContext.timeDomain = null;
- onTimerContext.timer = null;
- }
-
- @Override
- public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
- onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
- onTimerContext.timer = timer;
- userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
- onTimerContext.timeDomain = null;
- onTimerContext.timer = null;
+ context = new ContextImpl(userFunction, getProcessingTimeService());
}
@Override
@@ -90,15 +62,23 @@ public class ProcessOperator<K, IN, OUT>
context.element = null;
}
- private static class ContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.Context {
+ @Override
+ public void processWatermark(Watermark mark) throws Exception {
+ super.processWatermark(mark);
+ this.currentWatermark = mark.getTimestamp();
+ }
- private final TimerService timerService;
+ private class ContextImpl
+ extends ProcessFunction<IN, OUT>.Context
+ implements TimerService {
private StreamRecord<IN> element;
- ContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
+ private final ProcessingTimeService processingTimeService;
+
+ ContextImpl(ProcessFunction<IN, OUT> function, ProcessingTimeService processingTimeService) {
function.super();
- this.timerService = checkNotNull(timerService);
+ this.processingTimeService = processingTimeService;
}
@Override
@@ -113,39 +93,28 @@ public class ProcessOperator<K, IN, OUT>
}
@Override
- public TimerService timerService() {
- return timerService;
+ public long currentProcessingTime() {
+ return processingTimeService.getCurrentProcessingTime();
}
- }
-
- private static class OnTimerContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.OnTimerContext{
-
- private final TimerService timerService;
- private TimeDomain timeDomain;
-
- private InternalTimer<?, VoidNamespace> timer;
-
- OnTimerContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
- function.super();
- this.timerService = checkNotNull(timerService);
+ @Override
+ public long currentWatermark() {
+ return currentWatermark;
}
@Override
- public TimeDomain timeDomain() {
- checkState(timeDomain != null);
- return timeDomain;
+ public void registerProcessingTimeTimer(long time) {
+ throw new UnsupportedOperationException("Setting timers is only supported on a KeyedStream.");
}
@Override
- public Long timestamp() {
- checkState(timer != null);
- return timer.getTimestamp();
+ public void registerEventTimeTimer(long time) {
+ throw new UnsupportedOperationException("Setting timers is only supported on a KeyedStream.");
}
@Override
public TimerService timerService() {
- return timerService;
+ return this;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 12af1d4..a619338 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -47,8 +46,9 @@ import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -643,7 +643,7 @@ public class DataStreamTest {
* an operator.
*/
@Test
- public void testProcessTranslation() {
+ public void testKeyedProcessTranslation() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Long> src = env.generateSequence(0, 0);
@@ -674,9 +674,48 @@ public class DataStreamTest {
processed.addSink(new DiscardingSink<Integer>());
assertEquals(processFunction, getFunctionForDataStream(processed));
+ assertTrue(getOperatorForDataStream(processed) instanceof KeyedProcessOperator);
+ }
+
+ /**
+ * Verify that a {@link DataStream#process(ProcessFunction)} call is correctly translated to
+ * an operator.
+ */
+ @Test
+ public void testProcessTranslation() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStreamSource<Long> src = env.generateSequence(0, 0);
+
+ ProcessFunction<Long, Integer> processFunction = new ProcessFunction<Long, Integer>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(
+ Long value,
+ Context ctx,
+ Collector<Integer> out) throws Exception {
+
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<Integer> out) throws Exception {
+
+ }
+ };
+
+ DataStream<Integer> processed = src
+ .process(processFunction);
+
+ processed.addSink(new DiscardingSink<Integer>());
+
+ assertEquals(processFunction, getFunctionForDataStream(processed));
assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator);
}
+
@Test
public void operatorTest() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
new file mode 100644
index 0000000..32953fc
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link KeyedProcessOperator}.
+ */
+public class KeyedProcessOperatorTest extends TestLogger {
+
+ @Test
+ public void testTimestampAndWatermarkQuerying() throws Exception {
+
+ KeyedProcessOperator<Integer, Integer, String> operator =
+ new KeyedProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark(new Watermark(17));
+ testHarness.processElement(new StreamRecord<>(5, 12L));
+
+ testHarness.processWatermark(new Watermark(42));
+ testHarness.processElement(new StreamRecord<>(6, 13L));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(17L));
+ expectedOutput.add(new StreamRecord<>("5TIME:17 TS:12", 12L));
+ expectedOutput.add(new Watermark(42L));
+ expectedOutput.add(new StreamRecord<>("6TIME:42 TS:13", 13L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testTimestampAndProcessingTimeQuerying() throws Exception {
+
+ KeyedProcessOperator<Integer, Integer, String> operator =
+ new KeyedProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(17);
+ testHarness.processElement(new StreamRecord<>(5));
+
+ testHarness.setProcessingTime(42);
+ testHarness.processElement(new StreamRecord<>(6));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("5TIME:17 TS:null"));
+ expectedOutput.add(new StreamRecord<>("6TIME:42 TS:null"));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testEventTimeTimers() throws Exception {
+
+ KeyedProcessOperator<Integer, Integer, Integer> operator =
+ new KeyedProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark(new Watermark(0));
+
+ testHarness.processElement(new StreamRecord<>(17, 42L));
+
+ testHarness.processWatermark(new Watermark(5));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(0L));
+ expectedOutput.add(new StreamRecord<>(17, 42L));
+ expectedOutput.add(new StreamRecord<>(1777, 5L));
+ expectedOutput.add(new Watermark(5L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testProcessingTimeTimers() throws Exception {
+
+ KeyedProcessOperator<Integer, Integer, Integer> operator =
+ new KeyedProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(17));
+
+ testHarness.setProcessingTime(5);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>(17));
+ expectedOutput.add(new StreamRecord<>(1777, 5L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ /**
+ * Verifies that we don't have leakage between different keys.
+ */
+ @Test
+ public void testEventTimeTimerWithState() throws Exception {
+
+ KeyedProcessOperator<Integer, Integer, String> operator =
+ new KeyedProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark(new Watermark(1));
+ testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
+
+ testHarness.processWatermark(new Watermark(2));
+ testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
+
+ testHarness.processWatermark(new Watermark(6));
+ testHarness.processWatermark(new Watermark(7));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(1L));
+ expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
+ expectedOutput.add(new Watermark(2L));
+ expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
+ expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+ expectedOutput.add(new Watermark(6L));
+ expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+ expectedOutput.add(new Watermark(7L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ /**
+ * Verifies that we don't have leakage between different keys.
+ */
+ @Test
+ public void testProcessingTimeTimerWithState() throws Exception {
+
+ KeyedProcessOperator<Integer, Integer, String> operator =
+ new KeyedProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(1);
+ testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
+
+ testHarness.setProcessingTime(2);
+ testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
+
+ testHarness.setProcessingTime(6);
+ testHarness.setProcessingTime(7);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("INPUT:17"));
+ expectedOutput.add(new StreamRecord<>("INPUT:42"));
+ expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+ expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testSnapshotAndRestore() throws Exception {
+
+ KeyedProcessOperator<Integer, Integer, String> operator =
+ new KeyedProcessOperator<>(new BothTriggeringFlatMapFunction());
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(5, 12L));
+
+ // snapshot and restore from scratch
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+ testHarness.close();
+
+ operator = new KeyedProcessOperator<>(new BothTriggeringFlatMapFunction());
+
+ testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.setProcessingTime(5);
+ testHarness.processWatermark(new Watermark(6));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+ expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+ expectedOutput.add(new Watermark(6));
+
+ System.out.println("GOT: " + testHarness.getOutput());
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T getKey(T value) throws Exception {
+ return value;
+ }
+ }
+
+ private static class QueryingFlatMapFunction extends ProcessFunction<Integer, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TimeDomain timeDomain;
+
+ public QueryingFlatMapFunction(TimeDomain timeDomain) {
+ this.timeDomain = timeDomain;
+ }
+
+ @Override
+ public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+ if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+ out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+ } else {
+ out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+ }
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ }
+ }
+
+ private static class TriggeringFlatMapFunction extends ProcessFunction<Integer, Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TimeDomain timeDomain;
+
+ public TriggeringFlatMapFunction(TimeDomain timeDomain) {
+ this.timeDomain = timeDomain;
+ }
+
+ @Override
+ public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+ out.collect(value);
+ if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+ ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+ } else {
+ ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+ }
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<Integer> out) throws Exception {
+
+ assertEquals(this.timeDomain, ctx.timeDomain());
+ out.collect(1777);
+ }
+ }
+
+ private static class TriggeringStatefulFlatMapFunction extends ProcessFunction<Integer, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<Integer> state =
+ new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE);
+
+ private final TimeDomain timeDomain;
+
+ public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
+ this.timeDomain = timeDomain;
+ }
+
+ @Override
+ public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT:" + value);
+ getRuntimeContext().getState(state).update(value);
+ if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+ ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+ } else {
+ ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+ }
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ assertEquals(this.timeDomain, ctx.timeDomain());
+ out.collect("STATE:" + getRuntimeContext().getState(state).value());
+ }
+ }
+
+ private static class BothTriggeringFlatMapFunction extends ProcessFunction<Integer, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+ ctx.timerService().registerProcessingTimeTimer(5);
+ ctx.timerService().registerEventTimeTimer(6);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
+ out.collect("EVENT:1777");
+ } else {
+ out.collect("PROC:1777");
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
index b946646..c37fe48 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
@@ -18,16 +18,10 @@
package org.apache.flink.streaming.api.operators;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
@@ -36,8 +30,6 @@ import org.junit.Test;
import java.util.concurrent.ConcurrentLinkedQueue;
-import static org.junit.Assert.assertEquals;
-
/**
* Tests {@link ProcessOperator}.
*/
@@ -46,11 +38,11 @@ public class ProcessOperatorTest extends TestLogger {
@Test
public void testTimestampAndWatermarkQuerying() throws Exception {
- ProcessOperator<Integer, Integer, String> operator =
- new ProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+ ProcessOperator<Integer, String> operator =
+ new ProcessOperator<>(new QueryingProcessFunction(TimeDomain.EVENT_TIME));
OneInputStreamOperatorTestHarness<Integer, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+ new OneInputStreamOperatorTestHarness<>(operator);
testHarness.setup();
testHarness.open();
@@ -76,11 +68,11 @@ public class ProcessOperatorTest extends TestLogger {
@Test
public void testTimestampAndProcessingTimeQuerying() throws Exception {
- ProcessOperator<Integer, Integer, String> operator =
- new ProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+ ProcessOperator<Integer, String> operator =
+ new ProcessOperator<>(new QueryingProcessFunction(TimeDomain.PROCESSING_TIME));
OneInputStreamOperatorTestHarness<Integer, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+ new OneInputStreamOperatorTestHarness<>(operator);
testHarness.setup();
testHarness.open();
@@ -100,198 +92,13 @@ public class ProcessOperatorTest extends TestLogger {
testHarness.close();
}
-
- @Test
- public void testEventTimeTimers() throws Exception {
-
- ProcessOperator<Integer, Integer, Integer> operator =
- new ProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
-
- OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processWatermark(new Watermark(0));
-
- testHarness.processElement(new StreamRecord<>(17, 42L));
-
- testHarness.processWatermark(new Watermark(5));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new Watermark(0L));
- expectedOutput.add(new StreamRecord<>(17, 42L));
- expectedOutput.add(new StreamRecord<>(1777, 5L));
- expectedOutput.add(new Watermark(5L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- @Test
- public void testProcessingTimeTimers() throws Exception {
-
- ProcessOperator<Integer, Integer, Integer> operator =
- new ProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
-
- OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(17));
-
- testHarness.setProcessingTime(5);
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>(17));
- expectedOutput.add(new StreamRecord<>(1777, 5L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- /**
- * Verifies that we don't have leakage between different keys.
- */
- @Test
- public void testEventTimeTimerWithState() throws Exception {
-
- ProcessOperator<Integer, Integer, String> operator =
- new ProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
-
- OneInputStreamOperatorTestHarness<Integer, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processWatermark(new Watermark(1));
- testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
-
- testHarness.processWatermark(new Watermark(2));
- testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
-
- testHarness.processWatermark(new Watermark(6));
- testHarness.processWatermark(new Watermark(7));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new Watermark(1L));
- expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
- expectedOutput.add(new Watermark(2L));
- expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
- expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
- expectedOutput.add(new Watermark(6L));
- expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
- expectedOutput.add(new Watermark(7L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- /**
- * Verifies that we don't have leakage between different keys.
- */
- @Test
- public void testProcessingTimeTimerWithState() throws Exception {
-
- ProcessOperator<Integer, Integer, String> operator =
- new ProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
-
- OneInputStreamOperatorTestHarness<Integer, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.setProcessingTime(1);
- testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
-
- testHarness.setProcessingTime(2);
- testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
-
- testHarness.setProcessingTime(6);
- testHarness.setProcessingTime(7);
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>("INPUT:17"));
- expectedOutput.add(new StreamRecord<>("INPUT:42"));
- expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
- expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- @Test
- public void testSnapshotAndRestore() throws Exception {
-
- ProcessOperator<Integer, Integer, String> operator =
- new ProcessOperator<>(new BothTriggeringFlatMapFunction());
-
- OneInputStreamOperatorTestHarness<Integer, String> testHarness =
- new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processElement(new StreamRecord<>(5, 12L));
-
- // snapshot and restore from scratch
- OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-
- testHarness.close();
-
- operator = new ProcessOperator<>(new BothTriggeringFlatMapFunction());
-
- testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
- testHarness.setup();
- testHarness.initializeState(snapshot);
- testHarness.open();
-
- testHarness.setProcessingTime(5);
- testHarness.processWatermark(new Watermark(6));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
- expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
- expectedOutput.add(new Watermark(6));
-
- System.out.println("GOT: " + testHarness.getOutput());
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- private static class IdentityKeySelector<T> implements KeySelector<T, T> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public T getKey(T value) throws Exception {
- return value;
- }
- }
-
- private static class QueryingFlatMapFunction extends ProcessFunction<Integer, String> {
+ private static class QueryingProcessFunction extends ProcessFunction<Integer, String> {
private static final long serialVersionUID = 1L;
private final TimeDomain timeDomain;
- public QueryingFlatMapFunction(TimeDomain timeDomain) {
+ public QueryingProcessFunction(TimeDomain timeDomain) {
this.timeDomain = timeDomain;
}
@@ -311,93 +118,4 @@ public class ProcessOperatorTest extends TestLogger {
Collector<String> out) throws Exception {
}
}
-
- private static class TriggeringFlatMapFunction extends ProcessFunction<Integer, Integer> {
-
- private static final long serialVersionUID = 1L;
-
- private final TimeDomain timeDomain;
-
- public TriggeringFlatMapFunction(TimeDomain timeDomain) {
- this.timeDomain = timeDomain;
- }
-
- @Override
- public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
- out.collect(value);
- if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
- ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
- } else {
- ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
- }
- }
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<Integer> out) throws Exception {
-
- assertEquals(this.timeDomain, ctx.timeDomain());
- out.collect(1777);
- }
- }
-
- private static class TriggeringStatefulFlatMapFunction extends ProcessFunction<Integer, String> {
-
- private static final long serialVersionUID = 1L;
-
- private final ValueStateDescriptor<Integer> state =
- new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE);
-
- private final TimeDomain timeDomain;
-
- public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
- this.timeDomain = timeDomain;
- }
-
- @Override
- public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT:" + value);
- getRuntimeContext().getState(state).update(value);
- if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
- ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
- } else {
- ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
- }
- }
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
- assertEquals(this.timeDomain, ctx.timeDomain());
- out.collect("STATE:" + getRuntimeContext().getState(state).value());
- }
- }
-
- private static class BothTriggeringFlatMapFunction extends ProcessFunction<Integer, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
- ctx.timerService().registerProcessingTimeTimer(5);
- ctx.timerService().registerEventTimeTimer(6);
- }
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
- if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
- out.collect("EVENT:1777");
- } else {
- out.collect("PROC:1777");
- }
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 35e1f23..084d389 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _}
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
-import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, TimestampExtractor}
+import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, ProcessFunction, TimestampExtractor}
import org.apache.flink.streaming.api.operators.OneInputStreamOperator
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
@@ -619,6 +619,28 @@ class DataStream[T](stream: JavaStream[T]) {
}
/**
+ * Applies the given [[ProcessFunction]] on the input stream, thereby
+ * creating a transformed output stream.
+ *
+ * The function will be called for every element in the stream and can produce
+ * zero or more output.
+ *
+ * @param processFunction The [[ProcessFunction]] that is called for each element
+ * in the stream.
+ */
+ @PublicEvolving
+ def process[R: TypeInformation](
+ processFunction: ProcessFunction[T, R]): DataStream[R] = {
+
+ if (processFunction == null) {
+ throw new NullPointerException("ProcessFunction must not be null.")
+ }
+
+ asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]]))
+ }
+
+
+ /**
* Creates a new DataStream that contains only the elements satisfying the given filter predicate.
*/
def filter(filter: FilterFunction[T]): DataStream[T] = {
http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 489b571..d5ef89f 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -70,7 +70,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* in the stream.
*/
@PublicEvolving
- def process[R: TypeInformation](
+ override def process[R: TypeInformation](
processFunction: ProcessFunction[T, R]): DataStream[R] = {
if (processFunction == null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index b546577..60c609d 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.co.CoMapFunction
import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
-import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, ProcessOperator, StreamOperator}
+import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, KeyedProcessOperator, ProcessOperator, StreamOperator}
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
@@ -397,7 +397,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
* Verify that a [[KeyedStream.process()]] call is correctly translated to an operator.
*/
@Test
- def testProcessTranslation(): Unit = {
+ def testKeyedProcessTranslation(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val src = env.generateSequence(0, 0)
@@ -412,9 +412,32 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
val flatMapped = src.keyBy(x => x).process(processFunction)
assert(processFunction == getFunctionForDataStream(flatMapped))
- assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _, _]])
+ assert(getOperatorForDataStream(flatMapped).isInstanceOf[KeyedProcessOperator[_, _, _]])
+ }
+
+ /**
+ * Verify that a [[DataStream.process()]] call is correctly translated to an operator.
+ */
+ @Test
+ def testProcessTranslation(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ val src = env.generateSequence(0, 0)
+
+ val processFunction = new ProcessFunction[Long, Int] {
+ override def processElement(
+ value: Long,
+ ctx: ProcessFunction[Long, Int]#Context,
+ out: Collector[Int]): Unit = ???
+ }
+
+ val flatMapped = src.process(processFunction)
+
+ assert(processFunction == getFunctionForDataStream(flatMapped))
+ assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _]])
}
+
@Test def operatorTest() {
val env = StreamExecutionEnvironment.getExecutionEnvironment
[2/5] flink git commit: [FLINK-4460] Make CoProcessFunction abstract,
add default onTime() method
Posted by al...@apache.org.
[FLINK-4460] Make CoProcessFunction abstract, add default onTime() method
This is in preparation of allowing CoProcessFunction on a non-keyed
connected stream. we will use it to allow side outputs from the
ProcessFunction Context.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e12f3203
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e12f3203
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e12f3203
Branch: refs/heads/master
Commit: e12f3203eda7f071daeea7c59ca32b3db7763b68
Parents: 0228676
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 1 11:33:03 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 6 12:26:16 2017 +0100
----------------------------------------------------------------------
.../api/datastream/ConnectedStreams.java | 9 -----
.../api/functions/co/CoProcessFunction.java | 24 ++++++------
.../api/functions/co/RichCoProcessFunction.java | 41 --------------------
.../api/operators/co/CoProcessOperator.java | 23 +++++------
.../api/operators/co/CoProcessOperatorTest.java | 15 ++++---
.../streaming/api/scala/ConnectedStreams.scala | 2 +-
6 files changed, 32 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 96a08d3..66601a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
@@ -243,10 +242,6 @@ public class ConnectedStreams<IN1, IN2> {
* function can also query the time and set timers. When reacting to the firing of set timers
* the function can directly emit elements and/or register yet more timers.
*
- * <p>A {@link RichCoProcessFunction}
- * can be used to gain access to features provided by the
- * {@link org.apache.flink.api.common.functions.RichFunction} interface.
- *
* @param coProcessFunction The {@link CoProcessFunction} that is called for each element
* in the stream.
*
@@ -274,10 +269,6 @@ public class ConnectedStreams<IN1, IN2> {
* this function can also query the time and set timers. When reacting to the firing of set
* timers the function can directly emit elements and/or register yet more timers.
*
- * <p>A {@link RichCoProcessFunction}
- * can be used to gain access to features provided by the
- * {@link org.apache.flink.api.common.functions.RichFunction} interface.
- *
* @param coProcessFunction The {@link CoProcessFunction} that is called for each element
* in the stream.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
index feff8fb..8811e32 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@@ -19,13 +19,11 @@
package org.apache.flink.streaming.api.functions.co;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.util.Collector;
-import java.io.Serializable;
-
/**
* A function that processes elements of two streams and produces a single output one.
*
@@ -46,7 +44,9 @@ import java.io.Serializable;
* @param <OUT> Output type.
*/
@PublicEvolving
-public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable {
+public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
+
+ private static final long serialVersionUID = 1L;
/**
* This method is called for each element in the first of the connected streams.
@@ -63,7 +63,7 @@ public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable
* @throws Exception The function may throw exceptions which cause the streaming program
* to fail and go into recovery.
*/
- void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
+ public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
/**
* This method is called for each element in the second of the connected streams.
@@ -80,7 +80,7 @@ public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable
* @throws Exception The function may throw exceptions which cause the streaming program
* to fail and go into recovery.
*/
- void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
+ public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
/**
* Called when a timer set using {@link TimerService} fires.
@@ -95,14 +95,14 @@ public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
- void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception ;
+ public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}
/**
* Information available in an invocation of {@link #processElement1(Object, Context, Collector)}/
* {@link #processElement2(Object, Context, Collector)}
* or {@link #onTimer(long, OnTimerContext, Collector)}.
*/
- interface Context {
+ public abstract class Context {
/**
* Timestamp of the element currently being processed or timestamp of a firing timer.
@@ -110,21 +110,21 @@ public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable
* <p>This might be {@code null}, for example if the time characteristic of your program
* is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
*/
- Long timestamp();
+ public abstract Long timestamp();
/**
* A {@link TimerService} for querying time and registering timers.
*/
- TimerService timerService();
+ public abstract TimerService timerService();
}
/**
* Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
*/
- interface OnTimerContext extends Context {
+ public abstract class OnTimerContext extends Context {
/**
* The {@link TimeDomain} of the firing timer.
*/
- TimeDomain timeDomain();
+ public abstract TimeDomain timeDomain();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
deleted file mode 100644
index 0fcea91..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * Rich variant of the {@link CoProcessFunction}. As a {@link RichFunction}, it gives
- * access to the {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
- * setup and teardown methods: {@link RichFunction#open(org.apache.flink.configuration.Configuration)}
- * and {@link RichFunction#close()}.
- *
- * @param <IN1> Type of the first input.
- * @param <IN2> Type of the second input.
- * @param <OUT> Type of the returned elements.
- */
-@PublicEvolving
-public abstract class RichCoProcessFunction<IN1, IN2, OUT>
- extends AbstractRichFunction
- implements CoProcessFunction<IN1, IN2, OUT> {
-
- private static final long serialVersionUID = 1L;
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
index e6c3d3f..ed99815 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
@@ -44,11 +44,9 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
private transient TimestampedCollector<OUT> collector;
- private transient TimerService timerService;
+ private transient ContextImpl<IN1, IN2, OUT> context;
- private transient ContextImpl context;
-
- private transient OnTimerContextImpl onTimerContext;
+ private transient OnTimerContextImpl<IN1, IN2, OUT> onTimerContext;
public CoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
super(flatMapper);
@@ -62,10 +60,10 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
- this.timerService = new SimpleTimerService(internalTimerService);
+ TimerService timerService = new SimpleTimerService(internalTimerService);
- context = new ContextImpl(timerService);
- onTimerContext = new OnTimerContextImpl(timerService);
+ context = new ContextImpl<>(userFunction, timerService);
+ onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
}
@Override
@@ -108,13 +106,14 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
return collector;
}
- private static class ContextImpl implements CoProcessFunction.Context {
+ private static class ContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.Context {
private final TimerService timerService;
private StreamRecord<?> element;
- ContextImpl(TimerService timerService) {
+ ContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
+ function.super();
this.timerService = checkNotNull(timerService);
}
@@ -135,7 +134,8 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
}
}
- private static class OnTimerContextImpl implements CoProcessFunction.OnTimerContext {
+ private static class OnTimerContextImpl<IN1, IN2, OUT>
+ extends CoProcessFunction<IN1, IN2, OUT>.OnTimerContext {
private final TimerService timerService;
@@ -143,7 +143,8 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
private InternalTimer<?, VoidNamespace> timer;
- OnTimerContextImpl(TimerService timerService) {
+ OnTimerContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
+ function.super();
this.timerService = checkNotNull(timerService);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
index eea428f..9d7c444 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -344,7 +343,7 @@ public class CoProcessOperatorTest extends TestLogger {
}
}
- private static class WatermarkQueryingProcessFunction implements CoProcessFunction<Integer, String, String> {
+ private static class WatermarkQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
private static final long serialVersionUID = 1L;
@@ -366,7 +365,7 @@ public class CoProcessOperatorTest extends TestLogger {
}
}
- private static class EventTimeTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+ private static class EventTimeTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
private static final long serialVersionUID = 1L;
@@ -393,7 +392,7 @@ public class CoProcessOperatorTest extends TestLogger {
}
}
- private static class EventTimeTriggeringStatefulProcessFunction extends RichCoProcessFunction<Integer, String, String> {
+ private static class EventTimeTriggeringStatefulProcessFunction extends CoProcessFunction<Integer, String, String> {
private static final long serialVersionUID = 1L;
@@ -425,7 +424,7 @@ public class CoProcessOperatorTest extends TestLogger {
}
}
- private static class ProcessingTimeTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+ private static class ProcessingTimeTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
private static final long serialVersionUID = 1L;
@@ -452,7 +451,7 @@ public class CoProcessOperatorTest extends TestLogger {
}
}
- private static class ProcessingTimeQueryingProcessFunction implements CoProcessFunction<Integer, String, String> {
+ private static class ProcessingTimeQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
private static final long serialVersionUID = 1L;
@@ -474,7 +473,7 @@ public class CoProcessOperatorTest extends TestLogger {
}
}
- private static class ProcessingTimeTriggeringStatefulProcessFunction extends RichCoProcessFunction<Integer, String, String> {
+ private static class ProcessingTimeTriggeringStatefulProcessFunction extends CoProcessFunction<Integer, String, String> {
private static final long serialVersionUID = 1L;
@@ -506,7 +505,7 @@ public class CoProcessOperatorTest extends TestLogger {
}
}
- private static class BothTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+ private static class BothTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index a7325a4..cebda5d 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoProcessFunction, RichCoProcessFunction}
+import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoProcessFunction}
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
import org.apache.flink.util.Collector
[4/5] flink git commit: [FLINK-4660] Allow CoProcessFunction on
non-keyed ConnectedStreams
Posted by al...@apache.org.
[FLINK-4660] Allow CoProcessFunction on non-keyed ConnectedStreams
Introduce new CoProcessOperator for this. Rename the pre-existing
CoProcessOperator to KeyedCoProcessOperator.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06740fb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06740fb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06740fb2
Branch: refs/heads/master
Commit: 06740fb28d3f3269d040d68cbfefc5201203b60d
Parents: e12f320
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 1 12:02:34 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 6 12:26:16 2017 +0100
----------------------------------------------------------------------
.../api/datastream/ConnectedStreams.java | 15 +-
.../api/operators/co/CoProcessOperator.java | 89 +--
.../operators/co/KeyedCoProcessOperator.java | 168 ++++++
.../api/operators/co/CoProcessOperatorTest.java | 401 +-------------
.../co/KeyedCoProcessOperatorTest.java | 535 +++++++++++++++++++
5 files changed, 742 insertions(+), 466 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/06740fb2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 66601a7..9fe3a4c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -29,9 +29,10 @@ import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import static java.util.Objects.requireNonNull;
@@ -281,13 +282,13 @@ public class ConnectedStreams<IN1, IN2> {
CoProcessFunction<IN1, IN2, R> coProcessFunction,
TypeInformation<R> outputType) {
- if (!(inputStream1 instanceof KeyedStream) || !(inputStream2 instanceof KeyedStream)) {
- throw new UnsupportedOperationException("A CoProcessFunction can only be applied" +
- "when both input streams are keyed.");
- }
+ TwoInputStreamOperator<IN1, IN2, R> operator;
- CoProcessOperator<Object, IN1, IN2, R> operator = new CoProcessOperator<>(
- inputStream1.clean(coProcessFunction));
+ if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) {
+ operator = new KeyedCoProcessOperator<>(inputStream1.clean(coProcessFunction));
+ } else {
+ operator = new CoProcessOperator<>(inputStream1.clean(coProcessFunction));
+ }
return transform("Co-Process", outputType, operator);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/06740fb2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
index ed99815..4133e7b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
@@ -18,35 +18,32 @@
package org.apache.flink.streaming.api.operators.co;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.SimpleTimerService;
-import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;
@Internal
-public class CoProcessOperator<K, IN1, IN2, OUT>
+public class CoProcessOperator<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, CoProcessFunction<IN1, IN2, OUT>>
- implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
+ implements TwoInputStreamOperator<IN1, IN2, OUT> {
private static final long serialVersionUID = 1L;
private transient TimestampedCollector<OUT> collector;
- private transient ContextImpl<IN1, IN2, OUT> context;
+ private transient ContextImpl context;
- private transient OnTimerContextImpl<IN1, IN2, OUT> onTimerContext;
+ /** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
+ private long currentWatermark = Long.MIN_VALUE;
public CoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
super(flatMapper);
@@ -57,13 +54,7 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
super.open();
collector = new TimestampedCollector<>(output);
- InternalTimerService<VoidNamespace> internalTimerService =
- getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
-
- TimerService timerService = new SimpleTimerService(internalTimerService);
-
- context = new ContextImpl<>(userFunction, timerService);
- onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
+ context = new ContextImpl(userFunction, getProcessingTimeService());
}
@Override
@@ -83,36 +74,20 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
}
@Override
- public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
- onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
- onTimerContext.timer = timer;
- userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
- onTimerContext.timeDomain = null;
- onTimerContext.timer = null;
+ public void processWatermark(Watermark mark) throws Exception {
+ super.processWatermark(mark);
+ currentWatermark = mark.getTimestamp();
}
- @Override
- public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
- onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
- onTimerContext.timer = timer;
- userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
- onTimerContext.timeDomain = null;
- onTimerContext.timer = null;
- }
+ private class ContextImpl
+ extends CoProcessFunction<IN1, IN2, OUT>.Context
+ implements TimerService {
- protected TimestampedCollector<OUT> getCollector() {
- return collector;
- }
-
- private static class ContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.Context {
-
- private final TimerService timerService;
+ private final ProcessingTimeService timerService;
private StreamRecord<?> element;
- ContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
+ ContextImpl(CoProcessFunction<IN1, IN2, OUT> function, ProcessingTimeService timerService) {
function.super();
this.timerService = checkNotNull(timerService);
}
@@ -129,40 +104,28 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
}
@Override
- public TimerService timerService() {
- return timerService;
+ public long currentProcessingTime() {
+ return timerService.getCurrentProcessingTime();
}
- }
- private static class OnTimerContextImpl<IN1, IN2, OUT>
- extends CoProcessFunction<IN1, IN2, OUT>.OnTimerContext {
-
- private final TimerService timerService;
-
- private TimeDomain timeDomain;
-
- private InternalTimer<?, VoidNamespace> timer;
-
- OnTimerContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
- function.super();
- this.timerService = checkNotNull(timerService);
+ @Override
+ public long currentWatermark() {
+ return currentWatermark;
}
@Override
- public TimeDomain timeDomain() {
- checkState(timeDomain != null);
- return timeDomain;
+ public void registerProcessingTimeTimer(long time) {
+ throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams.");
}
@Override
- public Long timestamp() {
- checkState(timer != null);
- return timer.getTimestamp();
+ public void registerEventTimeTimer(long time) {
+ throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams.");
}
@Override
public TimerService timerService() {
- return timerService;
+ return this;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/06740fb2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
new file mode 100644
index 0000000..e721ab8
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@Internal
+public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
+ extends AbstractUdfStreamOperator<OUT, CoProcessFunction<IN1, IN2, OUT>>
+ implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
+
+ private static final long serialVersionUID = 1L;
+
+ private transient TimestampedCollector<OUT> collector;
+
+ private transient ContextImpl<IN1, IN2, OUT> context;
+
+ private transient OnTimerContextImpl<IN1, IN2, OUT> onTimerContext;
+
+ public KeyedCoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
+ super(flatMapper);
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ collector = new TimestampedCollector<>(output);
+
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
+
+ TimerService timerService = new SimpleTimerService(internalTimerService);
+
+ context = new ContextImpl<>(userFunction, timerService);
+ onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
+ }
+
+ @Override
+ public void processElement1(StreamRecord<IN1> element) throws Exception {
+ collector.setTimestamp(element);
+ context.element = element;
+ userFunction.processElement1(element.getValue(), context, collector);
+ context.element = null;
+ }
+
+ @Override
+ public void processElement2(StreamRecord<IN2> element) throws Exception {
+ collector.setTimestamp(element);
+ context.element = element;
+ userFunction.processElement2(element.getValue(), context, collector);
+ context.element = null;
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+ onTimerContext.timer = timer;
+ userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+ onTimerContext.timeDomain = null;
+ onTimerContext.timer = null;
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+ onTimerContext.timer = timer;
+ userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+ onTimerContext.timeDomain = null;
+ onTimerContext.timer = null;
+ }
+
+ protected TimestampedCollector<OUT> getCollector() {
+ return collector;
+ }
+
+ private static class ContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.Context {
+
+ private final TimerService timerService;
+
+ private StreamRecord<?> element;
+
+ ContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
+ function.super();
+ this.timerService = checkNotNull(timerService);
+ }
+
+ @Override
+ public Long timestamp() {
+ checkState(element != null);
+
+ if (element.hasTimestamp()) {
+ return element.getTimestamp();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+ }
+
+ private static class OnTimerContextImpl<IN1, IN2, OUT>
+ extends CoProcessFunction<IN1, IN2, OUT>.OnTimerContext {
+
+ private final TimerService timerService;
+
+ private TimeDomain timeDomain;
+
+ private InternalTimer<?, VoidNamespace> timer;
+
+ OnTimerContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
+ function.super();
+ this.timerService = checkNotNull(timerService);
+ }
+
+ @Override
+ public TimeDomain timeDomain() {
+ checkState(timeDomain != null);
+ return timeDomain;
+ }
+
+ @Override
+ public Long timestamp() {
+ checkState(timer != null);
+ return timer.getTimestamp();
+ }
+
+ @Override
+ public TimerService timerService() {
+ return timerService;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/06740fb2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
index 9d7c444..c19eb37 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
@@ -18,18 +18,12 @@
package org.apache.flink.streaming.api.operators.co;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -46,15 +40,11 @@ public class CoProcessOperatorTest extends TestLogger {
@Test
public void testTimestampAndWatermarkQuerying() throws Exception {
- CoProcessOperator<String, Integer, String, String> operator =
+ CoProcessOperator<Integer, String, String> operator =
new CoProcessOperator<>(new WatermarkQueryingProcessFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
- new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
+ new TwoInputStreamOperatorTestHarness<>(operator);
testHarness.setup();
testHarness.open();
@@ -82,15 +72,11 @@ public class CoProcessOperatorTest extends TestLogger {
@Test
public void testTimestampAndProcessingTimeQuerying() throws Exception {
- CoProcessOperator<String, Integer, String, String> operator =
+ CoProcessOperator<Integer, String, String> operator =
new CoProcessOperator<>(new ProcessingTimeQueryingProcessFunction());
TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
- new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
+ new TwoInputStreamOperatorTestHarness<>(operator);
testHarness.setup();
testHarness.open();
@@ -111,237 +97,6 @@ public class CoProcessOperatorTest extends TestLogger {
testHarness.close();
}
- @Test
- public void testEventTimeTimers() throws Exception {
-
- CoProcessOperator<String, Integer, String, String> operator =
- new CoProcessOperator<>(new EventTimeTriggeringProcessFunction());
-
- TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
- new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processElement1(new StreamRecord<>(17, 42L));
- testHarness.processElement2(new StreamRecord<>("18", 42L));
-
- testHarness.processWatermark1(new Watermark(5));
- testHarness.processWatermark2(new Watermark(5));
-
- testHarness.processWatermark1(new Watermark(6));
- testHarness.processWatermark2(new Watermark(6));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
- expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
- expectedOutput.add(new StreamRecord<>("1777", 5L));
- expectedOutput.add(new Watermark(5L));
- expectedOutput.add(new StreamRecord<>("1777", 6L));
- expectedOutput.add(new Watermark(6L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- @Test
- public void testProcessingTimeTimers() throws Exception {
-
- CoProcessOperator<String, Integer, String, String> operator =
- new CoProcessOperator<>(new ProcessingTimeTriggeringProcessFunction());
-
- TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
- new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processElement1(new StreamRecord<>(17));
- testHarness.processElement2(new StreamRecord<>("18"));
-
- testHarness.setProcessingTime(5);
- testHarness.setProcessingTime(6);
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>("INPUT1:17"));
- expectedOutput.add(new StreamRecord<>("INPUT2:18"));
- expectedOutput.add(new StreamRecord<>("1777", 5L));
- expectedOutput.add(new StreamRecord<>("1777", 6L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- /**
- * Verifies that we don't have leakage between different keys.
- */
- @Test
- public void testEventTimeTimerWithState() throws Exception {
-
- CoProcessOperator<String, Integer, String, String> operator =
- new CoProcessOperator<>(new EventTimeTriggeringStatefulProcessFunction());
-
- TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
- new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processWatermark1(new Watermark(1));
- testHarness.processWatermark2(new Watermark(1));
- testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6
-
- testHarness.processWatermark1(new Watermark(2));
- testHarness.processWatermark2(new Watermark(2));
- testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7
-
- testHarness.processWatermark1(new Watermark(6));
- testHarness.processWatermark2(new Watermark(6));
-
- testHarness.processWatermark1(new Watermark(7));
- testHarness.processWatermark2(new Watermark(7));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new Watermark(1L));
- expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L));
- expectedOutput.add(new Watermark(2L));
- expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L));
- expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
- expectedOutput.add(new Watermark(6L));
- expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
- expectedOutput.add(new Watermark(7L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- /**
- * Verifies that we don't have leakage between different keys.
- */
- @Test
- public void testProcessingTimeTimerWithState() throws Exception {
-
- CoProcessOperator<String, Integer, String, String> operator =
- new CoProcessOperator<>(new ProcessingTimeTriggeringStatefulProcessFunction());
-
- TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
- new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.setProcessingTime(1);
- testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6
-
- testHarness.setProcessingTime(2);
- testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7
-
- testHarness.setProcessingTime(6);
- testHarness.setProcessingTime(7);
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>("INPUT1:17"));
- expectedOutput.add(new StreamRecord<>("INPUT2:42"));
- expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
- expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
- @Test
- public void testSnapshotAndRestore() throws Exception {
-
- CoProcessOperator<String, Integer, String, String> operator =
- new CoProcessOperator<>(new BothTriggeringProcessFunction());
-
- TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
- new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.setup();
- testHarness.open();
-
- testHarness.processElement1(new StreamRecord<>(5, 12L));
- testHarness.processElement2(new StreamRecord<>("5", 12L));
-
- // snapshot and restore from scratch
- OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-
- testHarness.close();
-
- operator = new CoProcessOperator<>(new BothTriggeringProcessFunction());
-
- testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
- operator,
- new IntToStringKeySelector<>(),
- new IdentityKeySelector<String>(),
- BasicTypeInfo.STRING_TYPE_INFO);
-
- testHarness.setup();
- testHarness.initializeState(snapshot);
- testHarness.open();
-
- testHarness.setProcessingTime(5);
- testHarness.processWatermark1(new Watermark(6));
- testHarness.processWatermark2(new Watermark(6));
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
- expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
- expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
- expectedOutput.add(new Watermark(6));
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.close();
- }
-
-
- private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Integer value) throws Exception {
- return "" + value;
- }
- }
-
- private static class IdentityKeySelector<T> implements KeySelector<T, T> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public T getKey(T value) throws Exception {
- return value;
- }
- }
private static class WatermarkQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
@@ -365,92 +120,6 @@ public class CoProcessOperatorTest extends TestLogger {
}
}
- private static class EventTimeTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT1:" + value);
- ctx.timerService().registerEventTimeTimer(5);
- }
-
- @Override
- public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT2:" + value);
- ctx.timerService().registerEventTimeTimer(6);
- }
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
-
- assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
- out.collect("" + 1777);
- }
- }
-
- private static class EventTimeTriggeringStatefulProcessFunction extends CoProcessFunction<Integer, String, String> {
-
- private static final long serialVersionUID = 1L;
-
- private final ValueStateDescriptor<String> state =
- new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE);
-
- @Override
- public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT1:" + value);
- getRuntimeContext().getState(state).update("" + value);
- ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
- }
-
- @Override
- public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT2:" + value);
- getRuntimeContext().getState(state).update(value);
- ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
- }
-
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
- assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
- out.collect("STATE:" + getRuntimeContext().getState(state).value());
- }
- }
-
- private static class ProcessingTimeTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT1:" + value);
- ctx.timerService().registerProcessingTimeTimer(5);
- }
-
- @Override
- public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT2:" + value);
- ctx.timerService().registerProcessingTimeTimer(6);
- }
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
-
- assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
- out.collect("" + 1777);
- }
- }
-
private static class ProcessingTimeQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
private static final long serialVersionUID = 1L;
@@ -472,64 +141,4 @@ public class CoProcessOperatorTest extends TestLogger {
Collector<String> out) throws Exception {
}
}
-
- private static class ProcessingTimeTriggeringStatefulProcessFunction extends CoProcessFunction<Integer, String, String> {
-
- private static final long serialVersionUID = 1L;
-
- private final ValueStateDescriptor<String> state =
- new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE);
-
- @Override
- public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT1:" + value);
- getRuntimeContext().getState(state).update("" + value);
- ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
- }
-
- @Override
- public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
- out.collect("INPUT2:" + value);
- getRuntimeContext().getState(state).update(value);
- ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
- }
-
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
- assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
- out.collect("STATE:" + getRuntimeContext().getState(state).value());
- }
- }
-
- private static class BothTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
- ctx.timerService().registerEventTimeTimer(6);
- }
-
- @Override
- public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
- ctx.timerService().registerProcessingTimeTimer(5);
- }
-
-
- @Override
- public void onTimer(
- long timestamp,
- OnTimerContext ctx,
- Collector<String> out) throws Exception {
- if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
- out.collect("EVENT:1777");
- } else {
- out.collect("PROC:1777");
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/06740fb2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
new file mode 100644
index 0000000..d8c9a61
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators.co;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link KeyedCoProcessOperator}.
+ */
+public class KeyedCoProcessOperatorTest extends TestLogger {
+
+ @Test
+ public void testTimestampAndWatermarkQuerying() throws Exception {
+
+ KeyedCoProcessOperator<String, Integer, String, String> operator =
+ new KeyedCoProcessOperator<>(new WatermarkQueryingProcessFunction());
+
+ TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark1(new Watermark(17));
+ testHarness.processWatermark2(new Watermark(17));
+ testHarness.processElement1(new StreamRecord<>(5, 12L));
+
+ testHarness.processWatermark1(new Watermark(42));
+ testHarness.processWatermark2(new Watermark(42));
+ testHarness.processElement2(new StreamRecord<>("6", 13L));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(17L));
+ expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L));
+ expectedOutput.add(new Watermark(42L));
+ expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testTimestampAndProcessingTimeQuerying() throws Exception {
+
+ KeyedCoProcessOperator<String, Integer, String, String> operator =
+ new KeyedCoProcessOperator<>(new ProcessingTimeQueryingProcessFunction());
+
+ TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(17);
+ testHarness.processElement1(new StreamRecord<>(5));
+
+ testHarness.setProcessingTime(42);
+ testHarness.processElement2(new StreamRecord<>("6"));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("5PT:17 TS:null"));
+ expectedOutput.add(new StreamRecord<>("6PT:42 TS:null"));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testEventTimeTimers() throws Exception {
+
+ KeyedCoProcessOperator<String, Integer, String, String> operator =
+ new KeyedCoProcessOperator<>(new EventTimeTriggeringProcessFunction());
+
+ TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement1(new StreamRecord<>(17, 42L));
+ testHarness.processElement2(new StreamRecord<>("18", 42L));
+
+ testHarness.processWatermark1(new Watermark(5));
+ testHarness.processWatermark2(new Watermark(5));
+
+ testHarness.processWatermark1(new Watermark(6));
+ testHarness.processWatermark2(new Watermark(6));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
+ expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
+ expectedOutput.add(new StreamRecord<>("1777", 5L));
+ expectedOutput.add(new Watermark(5L));
+ expectedOutput.add(new StreamRecord<>("1777", 6L));
+ expectedOutput.add(new Watermark(6L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testProcessingTimeTimers() throws Exception {
+
+ KeyedCoProcessOperator<String, Integer, String, String> operator =
+ new KeyedCoProcessOperator<>(new ProcessingTimeTriggeringProcessFunction());
+
+ TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement1(new StreamRecord<>(17));
+ testHarness.processElement2(new StreamRecord<>("18"));
+
+ testHarness.setProcessingTime(5);
+ testHarness.setProcessingTime(6);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+ expectedOutput.add(new StreamRecord<>("INPUT2:18"));
+ expectedOutput.add(new StreamRecord<>("1777", 5L));
+ expectedOutput.add(new StreamRecord<>("1777", 6L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ /**
+ * Verifies that we don't have leakage between different keys.
+ */
+ @Test
+ public void testEventTimeTimerWithState() throws Exception {
+
+ KeyedCoProcessOperator<String, Integer, String, String> operator =
+ new KeyedCoProcessOperator<>(new EventTimeTriggeringStatefulProcessFunction());
+
+ TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark1(new Watermark(1));
+ testHarness.processWatermark2(new Watermark(1));
+ testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6
+
+ testHarness.processWatermark1(new Watermark(2));
+ testHarness.processWatermark2(new Watermark(2));
+ testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7
+
+ testHarness.processWatermark1(new Watermark(6));
+ testHarness.processWatermark2(new Watermark(6));
+
+ testHarness.processWatermark1(new Watermark(7));
+ testHarness.processWatermark2(new Watermark(7));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(1L));
+ expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L));
+ expectedOutput.add(new Watermark(2L));
+ expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L));
+ expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+ expectedOutput.add(new Watermark(6L));
+ expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+ expectedOutput.add(new Watermark(7L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ /**
+ * Verifies that we don't have leakage between different keys.
+ */
+ @Test
+ public void testProcessingTimeTimerWithState() throws Exception {
+
+ KeyedCoProcessOperator<String, Integer, String, String> operator =
+ new KeyedCoProcessOperator<>(new ProcessingTimeTriggeringStatefulProcessFunction());
+
+ TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(1);
+ testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6
+
+ testHarness.setProcessingTime(2);
+ testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7
+
+ testHarness.setProcessingTime(6);
+ testHarness.setProcessingTime(7);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+ expectedOutput.add(new StreamRecord<>("INPUT2:42"));
+ expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+ expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testSnapshotAndRestore() throws Exception {
+
+ KeyedCoProcessOperator<String, Integer, String, String> operator =
+ new KeyedCoProcessOperator<>(new BothTriggeringProcessFunction());
+
+ TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement1(new StreamRecord<>(5, 12L));
+ testHarness.processElement2(new StreamRecord<>("5", 12L));
+
+ // snapshot and restore from scratch
+ OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+ testHarness.close();
+
+ operator = new KeyedCoProcessOperator<>(new BothTriggeringProcessFunction());
+
+ testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
+ operator,
+ new IntToStringKeySelector<>(),
+ new IdentityKeySelector<String>(),
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.initializeState(snapshot);
+ testHarness.open();
+
+ testHarness.setProcessingTime(5);
+ testHarness.processWatermark1(new Watermark(6));
+ testHarness.processWatermark2(new Watermark(6));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+ expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+ expectedOutput.add(new Watermark(6));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+
+ private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String getKey(Integer value) throws Exception {
+ return "" + value;
+ }
+ }
+
+ private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T getKey(T value) throws Exception {
+ return value;
+ }
+ }
+
+ private static class WatermarkQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+ out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+ out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ }
+ }
+
+ private static class EventTimeTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT1:" + value);
+ ctx.timerService().registerEventTimeTimer(5);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT2:" + value);
+ ctx.timerService().registerEventTimeTimer(6);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+
+ assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
+ out.collect("" + 1777);
+ }
+ }
+
+ private static class EventTimeTriggeringStatefulProcessFunction extends CoProcessFunction<Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<String> state =
+ new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE);
+
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT1:" + value);
+ getRuntimeContext().getState(state).update("" + value);
+ ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT2:" + value);
+ getRuntimeContext().getState(state).update(value);
+ ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+ }
+
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
+ out.collect("STATE:" + getRuntimeContext().getState(state).value());
+ }
+ }
+
+ private static class ProcessingTimeTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT1:" + value);
+ ctx.timerService().registerProcessingTimeTimer(5);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT2:" + value);
+ ctx.timerService().registerProcessingTimeTimer(6);
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+
+ assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
+ out.collect("" + 1777);
+ }
+ }
+
+ private static class ProcessingTimeQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+ out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+ out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ }
+ }
+
+ private static class ProcessingTimeTriggeringStatefulProcessFunction extends CoProcessFunction<Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<String> state =
+ new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE);
+
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT1:" + value);
+ getRuntimeContext().getState(state).update("" + value);
+ ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+ out.collect("INPUT2:" + value);
+ getRuntimeContext().getState(state).update(value);
+ ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+ }
+
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
+ out.collect("STATE:" + getRuntimeContext().getState(state).value());
+ }
+ }
+
+ private static class BothTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+ ctx.timerService().registerEventTimeTimer(6);
+ }
+
+ @Override
+ public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+ ctx.timerService().registerProcessingTimeTimer(5);
+ }
+
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ OnTimerContext ctx,
+ Collector<String> out) throws Exception {
+ if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
+ out.collect("EVENT:1777");
+ } else {
+ out.collect("PROC:1777");
+ }
+ }
+ }
+}