You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/10/21 17:14:22 UTC
[07/11] flink git commit: [FLINK-3674] Add an interface for Time
aware User Functions
http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
index 448f95f..00d4722 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
@@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@Internal
@@ -31,12 +30,6 @@ public class CoStreamMap<IN1, IN2, OUT>
private static final long serialVersionUID = 1L;
- // We keep track of watermarks from both inputs, the combined input is the minimum
- // Once the minimum advances we emit a new watermark for downstream operators
- private long combinedWatermark = Long.MIN_VALUE;
- private long input1Watermark = Long.MIN_VALUE;
- private long input2Watermark = Long.MIN_VALUE;
-
public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
super(mapper);
}
@@ -50,24 +43,4 @@ public class CoStreamMap<IN1, IN2, OUT>
public void processElement2(StreamRecord<IN2> element) throws Exception {
output.collect(element.replace(userFunction.map2(element.getValue())));
}
-
- @Override
- public void processWatermark1(Watermark mark) throws Exception {
- input1Watermark = mark.getTimestamp();
- long newMin = Math.min(input1Watermark, input2Watermark);
- if (newMin > combinedWatermark) {
- combinedWatermark = newMin;
- output.emitWatermark(new Watermark(combinedWatermark));
- }
- }
-
- @Override
- public void processWatermark2(Watermark mark) throws Exception {
- input2Watermark = mark.getTimestamp();
- long newMin = Math.min(input1Watermark, input2Watermark);
- if (newMin > combinedWatermark) {
- combinedWatermark = newMin;
- output.emitWatermark(new Watermark(combinedWatermark));
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
new file mode 100644
index 0000000..df2320f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+@Internal
+public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
+ extends AbstractUdfStreamOperator<OUT, TimelyCoFlatMapFunction<IN1, IN2, OUT>>
+ implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TypeSerializer<K> keySerializer;
+
+ private transient TimestampedCollector<OUT> collector;
+
+ private transient TimerService timerService;
+
+ public CoStreamTimelyFlatMap(
+ TypeSerializer<K> keySerializer,
+ TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
+ super(flatMapper);
+
+ this.keySerializer = keySerializer;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ collector = new TimestampedCollector<>(output);
+
+ InternalTimerService<VoidNamespace> internalTimerService =
+ getInternalTimerService("user-timers", keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+
+ this.timerService = new SimpleTimerService(internalTimerService);
+ }
+
+ @Override
+ public void processElement1(StreamRecord<IN1> element) throws Exception {
+ collector.setTimestamp(element);
+ userFunction.flatMap1(element.getValue(), timerService, collector);
+
+ }
+
+ @Override
+ public void processElement2(StreamRecord<IN2> element) throws Exception {
+ collector.setTimestamp(element);
+ userFunction.flatMap2(element.getValue(), timerService, collector);
+ }
+
+ @Override
+ public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ userFunction.onTimer(timer.getTimestamp(), TimeDomain.EVENT_TIME, timerService, collector);
+ }
+
+ @Override
+ public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+ collector.setAbsoluteTimestamp(timer.getTimestamp());
+ userFunction.onTimer(timer.getTimestamp(), TimeDomain.PROCESSING_TIME, timerService, collector);
+ }
+
+ protected TimestampedCollector<OUT> getCollector() {
+ return collector;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index b5500b7..36492d7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
@@ -76,6 +75,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
@Override
public void open() throws Exception {
+ super.open();
committer.setOperatorId(id);
committer.setOperatorSubtaskId(getRuntimeContext().getIndexOfThisSubtask());
committer.open();
@@ -113,6 +113,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
public void snapshotState(FSDataOutputStream out,
long checkpointId,
long timestamp) throws Exception {
+ super.snapshotState(out, checkpointId, timestamp);
saveHandleInState(checkpointId, timestamp);
@@ -121,6 +122,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
@Override
public void restoreState(FSDataInputStream in) throws Exception {
+ super.restoreState(in);
this.state = InstantiationUtil.deserializeObject(in, getUserCodeClassloader());
}
@@ -203,11 +205,6 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
serializer.serialize(value, new DataOutputViewStreamWrapper(out));
}
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- //don't do anything, since we are a sink
- }
-
/**
* This state is used to keep a list of all StateHandles (essentially references to past OperatorStates) that were
* used since the last completed checkpoint.
http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index d331d4d..2a77c0a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -32,7 +32,6 @@ import org.apache.flink.util.MathUtils;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -208,11 +207,6 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
}
@Override
- public void processWatermark(Watermark mark) {
- // this operator does not react to watermarks
- }
-
- @Override
public void trigger(long timestamp) throws Exception {
// first we check if we actually trigger the window function
if (timestamp == nextEvaluationTime) {
http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
index 79ef4c6..a252ece 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingKeyedTimePanes.java
@@ -66,7 +66,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
// optimized path for single pane case (tumbling window)
for (KeyMap.Entry<Key, ArrayList<Type>> entry : latestPane) {
Key key = entry.getKey();
- operator.setKeyContext(key);
+ operator.setCurrentKey(key);
function.apply(entry.getKey(), window, entry.getValue(), out);
}
}
@@ -122,7 +122,7 @@ public class AccumulatingKeyedTimePanes<Type, Key, Result> extends AbstractKeyed
@Override
public void keyDone() throws Exception {
- contextOperator.setKeyContext(currentKey);
+ contextOperator.setCurrentKey(currentKey);
function.apply(currentKey, window, unionIterator, out);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
index dfa357e..84686a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingKeyedTimePanes.java
@@ -93,7 +93,7 @@ public class AggregatingKeyedTimePanes<Type, Key> extends AbstractKeyedTimePanes
@Override
public void startNewKey(Key key) {
currentValue = null;
- operator.setKeyContext(key);
+ operator.setCurrentKey(key);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 141b5b8..2f4dbde 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
@@ -204,110 +204,82 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
}
@Override
- public void processWatermark(Watermark mark) throws Exception {
- boolean fire;
- do {
- Timer<K, W> timer = watermarkTimersQueue.peek();
- if (timer != null && timer.timestamp <= mark.getTimestamp()) {
- fire = true;
-
- watermarkTimers.remove(timer);
- watermarkTimersQueue.remove();
-
- context.key = timer.key;
- context.window = timer.window;
- setKeyContext(timer.key);
-
- ListState<StreamRecord<IN>> windowState;
- MergingWindowSet<W> mergingWindows = null;
-
- if (windowAssigner instanceof MergingWindowAssigner) {
- mergingWindows = getMergingWindowSet();
- W stateWindow = mergingWindows.getStateWindow(context.window);
- if (stateWindow == null) {
- // then the window is already purged and this is a cleanup
- // timer set due to allowed lateness that has nothing to clean,
- // so it is safe to just ignore
- continue;
- }
- windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
- } else {
- windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
- }
+ public void onEventTime(InternalTimer<K, W> timer) throws Exception {
- Iterable<StreamRecord<IN>> contents = windowState.get();
- if (contents == null) {
- // if we have no state, there is nothing to do
- continue;
- }
-
- TriggerResult triggerResult = context.onEventTime(timer.timestamp);
- if (triggerResult.isFire()) {
- fire(context.window, contents);
- }
+ context.key = timer.getKey();
+ context.window = timer.getNamespace();
- if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
- cleanup(context.window, windowState, mergingWindows);
- }
+ ListState<StreamRecord<IN>> windowState;
+ MergingWindowSet<W> mergingWindows = null;
- } else {
- fire = false;
+ if (windowAssigner instanceof MergingWindowAssigner) {
+ mergingWindows = getMergingWindowSet();
+ W stateWindow = mergingWindows.getStateWindow(context.window);
+ if (stateWindow == null) {
+ // then the window is already purged and this is a cleanup
+ // timer set due to allowed lateness that has nothing to clean,
+ // so it is safe to just ignore
+ return;
}
- } while (fire);
+ windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ } else {
+ windowState = getPartitionedState(
+ context.window,
+ windowSerializer,
+ windowStateDescriptor);
+ }
+
+ Iterable<StreamRecord<IN>> contents = windowState.get();
+ if (contents == null) {
+ // if we have no state, there is nothing to do
+ return;
+ }
- output.emitWatermark(mark);
+ TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+ if (triggerResult.isFire()) {
+ fire(context.window, contents);
+ }
- this.currentWatermark = mark.getTimestamp();
+ if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
+ cleanup(context.window, windowState, mergingWindows);
+ }
}
@Override
- public void trigger(long time) throws Exception {
- Timer<K, W> timer;
-
- while ((timer = processingTimeTimersQueue.peek()) != null && timer.timestamp <= time) {
-
- processingTimeTimers.remove(timer);
- processingTimeTimersQueue.remove();
-
- context.key = timer.key;
- context.window = timer.window;
- setKeyContext(timer.key);
+ public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
+ context.key = timer.getKey();
+ context.window = timer.getNamespace();
- ListState<StreamRecord<IN>> windowState;
- MergingWindowSet<W> mergingWindows = null;
+ ListState<StreamRecord<IN>> windowState;
+ MergingWindowSet<W> mergingWindows = null;
- if (windowAssigner instanceof MergingWindowAssigner) {
- mergingWindows = getMergingWindowSet();
- W stateWindow = mergingWindows.getStateWindow(context.window);
- if (stateWindow == null) {
- // then the window is already purged and this is a cleanup
- // timer set due to allowed lateness that has nothing to clean,
- // so it is safe to just ignore
- continue;
- }
- windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
- } else {
- windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
- }
-
- Iterable<StreamRecord<IN>> contents = windowState.get();
- if (contents == null) {
- // if we have no state, there is nothing to do
- continue;
+ if (windowAssigner instanceof MergingWindowAssigner) {
+ mergingWindows = getMergingWindowSet();
+ W stateWindow = mergingWindows.getStateWindow(context.window);
+ if (stateWindow == null) {
+ // then the window is already purged and this is a cleanup
+ // timer set due to allowed lateness that has nothing to clean,
+ // so it is safe to just ignore
+ return;
}
+ windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ } else {
+ windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+ }
- TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
- if (triggerResult.isFire()) {
- fire(context.window, contents);
- }
+ Iterable<StreamRecord<IN>> contents = windowState.get();
+ if (contents == null) {
+ // if we have no state, there is nothing to do
+ return;
+ }
- if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
- cleanup(context.window, windowState, mergingWindows);
- }
+ TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+ if (triggerResult.isFire()) {
+ fire(context.window, contents);
}
- if (timer != null) {
- nextTimer = getProcessingTimeService().registerTimer(timer.timestamp, this);
+ if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
+ cleanup(context.window, windowState, mergingWindows);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 459c679..bc37692 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -37,36 +37,28 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.concurrent.ScheduledFuture;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -96,7 +88,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
- implements OneInputStreamOperator<IN, OUT>, Triggerable {
+ implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {
private static final long serialVersionUID = 1L;
@@ -141,14 +133,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
*/
protected transient TimestampedCollector<OUT> timestampedCollector;
- protected transient ScheduledFuture<?> nextTimer;
-
- /**
- * To keep track of the current watermark so that we can immediately fire if a trigger
- * registers an event time callback for a timestamp that lies in the past.
- */
- protected long currentWatermark = Long.MIN_VALUE;
-
protected transient Context context = new Context(null, null);
protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
@@ -157,17 +141,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// State that needs to be checkpointed
// ------------------------------------------------------------------------
- /**
- * Processing time timers that are currently in-flight.
- */
- protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue;
- protected transient Set<Timer<K, W>> processingTimeTimers;
-
- /**
- * Current waiting watermark callbacks.
- */
- protected transient Set<Timer<K, W>> watermarkTimers;
- protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue;
+ private transient InternalTimerService<W> internalTimerService;
protected transient Map<K, MergingWindowSet<W>> mergingWindowsByKey;
@@ -208,49 +182,27 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
timestampedCollector = new TimestampedCollector<>(output);
- // these could already be initialized from restoreState()
- if (watermarkTimers == null) {
- watermarkTimers = new HashSet<>();
- watermarkTimersQueue = new PriorityQueue<>(100);
- }
- if (processingTimeTimers == null) {
- processingTimeTimers = new HashSet<>();
- processingTimeTimersQueue = new PriorityQueue<>(100);
- }
+ internalTimerService =
+ getInternalTimerService("window-timers", keySerializer, windowSerializer, this);
context = new Context(null, null);
windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
@Override
public long getCurrentProcessingTime() {
- return WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
+ return internalTimerService.currentProcessingTime();
}
};
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindowsByKey = new HashMap<>();
}
-
- // re-register the restored timers (if any)
- if (processingTimeTimersQueue.size() > 0) {
- nextTimer = getProcessingTimeService().registerTimer(processingTimeTimersQueue.peek().timestamp, this);
- }
}
@Override
public final void close() throws Exception {
super.close();
-
- if (nextTimer != null) {
- nextTimer.cancel(false);
- nextTimer = null;
- }
-
timestampedCollector = null;
- watermarkTimers = null;
- watermarkTimersQueue = null;
- processingTimeTimers = null;
- processingTimeTimersQueue = null;
context = null;
windowAssignerContext = null;
mergingWindowsByKey = null;
@@ -259,17 +211,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
@Override
public void dispose() throws Exception {
super.dispose();
-
- if (nextTimer != null) {
- nextTimer.cancel(false);
- nextTimer = null;
- }
-
timestampedCollector = null;
- watermarkTimers = null;
- watermarkTimersQueue = null;
- processingTimeTimers = null;
- processingTimeTimersQueue = null;
context = null;
windowAssignerContext = null;
mergingWindowsByKey = null;
@@ -392,110 +334,81 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
@Override
- public void processWatermark(Watermark mark) throws Exception {
- boolean fire;
- do {
- Timer<K, W> timer = watermarkTimersQueue.peek();
- if (timer != null && timer.timestamp <= mark.getTimestamp()) {
- fire = true;
-
- watermarkTimers.remove(timer);
- watermarkTimersQueue.remove();
-
- context.key = timer.key;
- context.window = timer.window;
- setKeyContext(timer.key);
-
- AppendingState<IN, ACC> windowState;
- MergingWindowSet<W> mergingWindows = null;
-
- if (windowAssigner instanceof MergingWindowAssigner) {
- mergingWindows = getMergingWindowSet();
- W stateWindow = mergingWindows.getStateWindow(context.window);
- if (stateWindow == null) {
- // then the window is already purged and this is a cleanup
- // timer set due to allowed lateness that has nothing to clean,
- // so it is safe to just ignore
- continue;
- }
- windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
- } else {
- windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
- }
-
- ACC contents = windowState.get();
- if (contents == null) {
- // if we have no state, there is nothing to do
- continue;
- }
-
- TriggerResult triggerResult = context.onEventTime(timer.timestamp);
- if (triggerResult.isFire()) {
- fire(context.window, contents);
- }
+ public void onEventTime(InternalTimer<K, W> timer) throws Exception {
+ context.key = timer.getKey();
+ context.window = timer.getNamespace();
- if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
- cleanup(context.window, windowState, mergingWindows);
- }
+ AppendingState<IN, ACC> windowState;
+ MergingWindowSet<W> mergingWindows = null;
- } else {
- fire = false;
+ if (windowAssigner instanceof MergingWindowAssigner) {
+ mergingWindows = getMergingWindowSet();
+ W stateWindow = mergingWindows.getStateWindow(context.window);
+ if (stateWindow == null) {
+ // then the window is already purged and this is a cleanup
+ // timer set due to allowed lateness that has nothing to clean,
+ // so it is safe to just ignore
+ return;
}
- } while (fire);
+ windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ } else {
+ windowState = getPartitionedState(
+ context.window,
+ windowSerializer,
+ windowStateDescriptor);
+ }
+
+ ACC contents = windowState.get();
+ if (contents == null) {
+ // if we have no state, there is nothing to do
+ return;
+ }
- output.emitWatermark(mark);
+ TriggerResult triggerResult = context.onEventTime(timer.getTimestamp());
+ if (triggerResult.isFire()) {
+ fire(context.window, contents);
+ }
- this.currentWatermark = mark.getTimestamp();
+ if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
+ cleanup(context.window, windowState, mergingWindows);
+ }
}
@Override
- public void trigger(long time) throws Exception {
- Timer<K, W> timer;
-
- while ((timer = processingTimeTimersQueue.peek()) != null && timer.timestamp <= time) {
-
- processingTimeTimers.remove(timer);
- processingTimeTimersQueue.remove();
-
- context.key = timer.key;
- context.window = timer.window;
- setKeyContext(timer.key);
-
- AppendingState<IN, ACC> windowState;
- MergingWindowSet<W> mergingWindows = null;
+ public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
+ context.key = timer.getKey();
+ context.window = timer.getNamespace();
- if (windowAssigner instanceof MergingWindowAssigner) {
- mergingWindows = getMergingWindowSet();
- W stateWindow = mergingWindows.getStateWindow(context.window);
- if (stateWindow == null) {
- // then the window is already purged and this is a cleanup
- // timer set due to allowed lateness that has nothing to clean,
- // so it is safe to just ignore
- continue;
- }
- windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
- } else {
- windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
- }
+ AppendingState<IN, ACC> windowState;
+ MergingWindowSet<W> mergingWindows = null;
- ACC contents = windowState.get();
- if (contents == null) {
- // if we have no state, there is nothing to do
- continue;
+ if (windowAssigner instanceof MergingWindowAssigner) {
+ mergingWindows = getMergingWindowSet();
+ W stateWindow = mergingWindows.getStateWindow(context.window);
+ if (stateWindow == null) {
+ // then the window is already purged and this is a cleanup
+ // timer set due to allowed lateness that has nothing to clean,
+ // so it is safe to just ignore
+ return;
}
+ windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);
+ } else {
+ windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor);
+ }
- TriggerResult triggerResult = context.onProcessingTime(timer.timestamp);
- if (triggerResult.isFire()) {
- fire(context.window, contents);
- }
+ ACC contents = windowState.get();
+ if (contents == null) {
+ // if we have no state, there is nothing to do
+ return;
+ }
- if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) {
- cleanup(context.window, windowState, mergingWindows);
- }
+ TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp());
+ if (triggerResult.isFire()) {
+ fire(context.window, contents);
}
- if (timer != null) {
- nextTimer = getProcessingTimeService().registerTimer(timer.timestamp, this);
+ if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp()))) {
+ cleanup(context.window, windowState, mergingWindows);
}
}
@@ -555,7 +468,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* considered when triggering.
*/
protected boolean isLate(W window) {
- return (windowAssigner.isEventTime() && (cleanupTime(window) <= currentWatermark));
+ return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}
/**
@@ -638,7 +551,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
public long getCurrentWatermark() {
- return currentWatermark;
+ return internalTimerService.currentWatermark();
}
@Override
@@ -697,54 +610,27 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
@Override
public long getCurrentProcessingTime() {
- return WindowOperator.this.getProcessingTimeService().getCurrentProcessingTime();
+ return internalTimerService.currentProcessingTime();
}
@Override
public void registerProcessingTimeTimer(long time) {
- Timer<K, W> timer = new Timer<>(time, key, window);
-
- // make sure we only put one timer per key into the queue
- if (processingTimeTimers.add(timer)) {
-
- Timer<K, W> oldHead = processingTimeTimersQueue.peek();
- long nextTriggerTime = oldHead != null ? oldHead.timestamp : Long.MAX_VALUE;
-
- processingTimeTimersQueue.add(timer);
-
- // check if we need to re-schedule our timer to earlier
- if (time < nextTriggerTime) {
- if (nextTimer != null) {
- nextTimer.cancel(false);
- }
- nextTimer = getProcessingTimeService().registerTimer(time, WindowOperator.this);
- }
- }
+ internalTimerService.registerProcessingTimeTimer(window, time);
}
@Override
public void registerEventTimeTimer(long time) {
- Timer<K, W> timer = new Timer<>(time, key, window);
- if (watermarkTimers.add(timer)) {
- watermarkTimersQueue.add(timer);
- }
+ internalTimerService.registerEventTimeTimer(window, time);
}
@Override
public void deleteProcessingTimeTimer(long time) {
- Timer<K, W> timer = new Timer<>(time, key, window);
-
- if (processingTimeTimers.remove(timer)) {
- processingTimeTimersQueue.remove(timer);
- }
+ internalTimerService.deleteProcessingTimeTimer(window, time);
}
@Override
public void deleteEventTimeTimer(long time) {
- Timer<K, W> timer = new Timer<>(time, key, window);
- if (watermarkTimers.remove(timer)) {
- watermarkTimersQueue.remove(timer);
- }
+ internalTimerService.deleteEventTimeTimer(window, time);
}
public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
@@ -843,67 +729,21 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} );
ListStateDescriptor<Tuple2<W, W>> mergeStateDescriptor = new ListStateDescriptor<>("merging-window-set", tupleSerializer);
for (Map.Entry<K, MergingWindowSet<W>> key: mergingWindowsByKey.entrySet()) {
- setKeyContext(key.getKey());
+ setCurrentKey(key.getKey());
ListState<Tuple2<W, W>> mergeState = getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, mergeStateDescriptor);
mergeState.clear();
key.getValue().persist(mergeState);
}
}
- snapshotTimers(new DataOutputViewStreamWrapper(out));
-
super.snapshotState(out, checkpointId, timestamp);
}
@Override
public void restoreState(FSDataInputStream in) throws Exception {
- restoreTimers(new DataInputViewStreamWrapper(in));
-
super.restoreState(in);
}
- private void restoreTimers(DataInputView in ) throws IOException {
- int numWatermarkTimers = in.readInt();
- watermarkTimers = new HashSet<>(numWatermarkTimers);
- watermarkTimersQueue = new PriorityQueue<>(Math.max(numWatermarkTimers, 1));
- for (int i = 0; i < numWatermarkTimers; i++) {
- K key = keySerializer.deserialize(in);
- W window = windowSerializer.deserialize(in);
- long timestamp = in.readLong();
- Timer<K, W> timer = new Timer<>(timestamp, key, window);
- watermarkTimers.add(timer);
- watermarkTimersQueue.add(timer);
- }
-
- int numProcessingTimeTimers = in.readInt();
- processingTimeTimersQueue = new PriorityQueue<>(Math.max(numProcessingTimeTimers, 1));
- processingTimeTimers = new HashSet<>();
- for (int i = 0; i < numProcessingTimeTimers; i++) {
- K key = keySerializer.deserialize(in);
- W window = windowSerializer.deserialize(in);
- long timestamp = in.readLong();
- Timer<K, W> timer = new Timer<>(timestamp, key, window);
- processingTimeTimersQueue.add(timer);
- processingTimeTimers.add(timer);
- }
- }
-
- private void snapshotTimers(DataOutputView out) throws IOException {
- out.writeInt(watermarkTimersQueue.size());
- for (Timer<K, W> timer : watermarkTimersQueue) {
- keySerializer.serialize(timer.key, out);
- windowSerializer.serialize(timer.window, out);
- out.writeLong(timer.timestamp);
- }
-
- out.writeInt(processingTimeTimers.size());
- for (Timer<K,W> timer : processingTimeTimers) {
- keySerializer.serialize(timer.key, out);
- windowSerializer.serialize(timer.window, out);
- out.writeLong(timer.timestamp);
- }
- }
-
// ------------------------------------------------------------------------
// Getters for testing
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index d2bf133..d0a2ea9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -20,9 +20,11 @@ package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
@@ -42,7 +44,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
private volatile boolean isQuiesced;
// sorts the timers by timestamp so that they are processed in the correct order.
- private final Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>();
+ private final Map<Long, List<ScheduledTimerFuture>> registeredTasks = new TreeMap<>();
public void setCurrentTime(long timestamp) throws Exception {
@@ -53,10 +55,10 @@ public class TestProcessingTimeService extends ProcessingTimeService {
// we do not fire them here to be able to accommodate timers
// that register other timers.
- Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator();
- List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>();
+ Iterator<Map.Entry<Long, List<ScheduledTimerFuture>>> it = registeredTasks.entrySet().iterator();
+ List<Map.Entry<Long, List<ScheduledTimerFuture>>> toRun = new ArrayList<>();
while (it.hasNext()) {
- Map.Entry<Long, List<Triggerable>> t = it.next();
+ Map.Entry<Long, List<ScheduledTimerFuture>> t = it.next();
if (t.getKey() <= this.currentTime) {
toRun.add(t);
it.remove();
@@ -64,10 +66,10 @@ public class TestProcessingTimeService extends ProcessingTimeService {
}
// now do the actual firing.
- for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
+ for (Map.Entry<Long, List<ScheduledTimerFuture>> tasks: toRun) {
long now = tasks.getKey();
- for (Triggerable task: tasks.getValue()) {
- task.trigger(now);
+ for (ScheduledTimerFuture task: tasks.getValue()) {
+ task.getTriggerable().trigger(now);
}
}
}
@@ -84,7 +86,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
throw new IllegalStateException("terminated");
}
if (isQuiesced) {
- return new DummyFuture();
+ return new ScheduledTimerFuture(null, -1);
}
if (timestamp <= currentTime) {
@@ -94,14 +96,17 @@ public class TestProcessingTimeService extends ProcessingTimeService {
throw new RuntimeException(e);
}
}
- List<Triggerable> tasks = registeredTasks.get(timestamp);
+
+ ScheduledTimerFuture result = new ScheduledTimerFuture(target, timestamp);
+
+ List<ScheduledTimerFuture> tasks = registeredTasks.get(timestamp);
if (tasks == null) {
tasks = new ArrayList<>();
registeredTasks.put(timestamp, tasks);
}
- tasks.add(target);
+ tasks.add(result);
- return new DummyFuture();
+ return result;
}
@Override
@@ -124,15 +129,34 @@ public class TestProcessingTimeService extends ProcessingTimeService {
public int getNumRegisteredTimers() {
int count = 0;
- for (List<Triggerable> tasks: registeredTasks.values()) {
+ for (List<ScheduledTimerFuture> tasks: registeredTasks.values()) {
count += tasks.size();
}
return count;
}
+ public Set<Long> getRegisteredTimerTimestamps() {
+ Set<Long> actualTimestamps = new HashSet<>();
+ for (List<ScheduledTimerFuture> timerFutures : registeredTasks.values()) {
+ for (ScheduledTimerFuture timer : timerFutures) {
+ actualTimestamps.add(timer.getTimestamp());
+ }
+ }
+ return actualTimestamps;
+ }
+
// ------------------------------------------------------------------------
- private static class DummyFuture implements ScheduledFuture<Object> {
+ private class ScheduledTimerFuture implements ScheduledFuture<Object> {
+
+ private final Triggerable triggerable;
+
+ private final long timestamp;
+
+ public ScheduledTimerFuture(Triggerable triggerable, long timestamp) {
+ this.triggerable = triggerable;
+ this.timestamp = timestamp;
+ }
@Override
public long getDelay(TimeUnit unit) {
@@ -146,6 +170,10 @@ public class TestProcessingTimeService extends ProcessingTimeService {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
+ List<ScheduledTimerFuture> scheduledTimerFutures = registeredTasks.get(timestamp);
+ if (scheduledTimerFutures != null) {
+ scheduledTimerFutures.remove(this);
+ }
return true;
}
@@ -168,5 +196,13 @@ public class TestProcessingTimeService extends ProcessingTimeService {
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException();
}
+
+ public Triggerable getTriggerable() {
+ return triggerable;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
new file mode 100644
index 0000000..84af997
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.InputStream;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests for {@link HeapInternalTimerService}.
+ */
+public class HeapInternalTimerServiceTest {
+
+ private static InternalTimer<Integer, String> anyInternalTimer() {
+ return any();
+ }
+
+ /**
+ * Verify that we only ever have one processing-time task registered at the
+ * {@link ProcessingTimeService}.
+ */
+ @Test
+ public void testOnlySetsOnePhysicalProcessingTimeTimer() throws Exception {
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+ TestKeyContext keyContext = new TestKeyContext();
+
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+
+ HeapInternalTimerService<Integer, String> timerService =
+ createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+ keyContext.setCurrentKey(0);
+
+ timerService.registerProcessingTimeTimer("ciao", 10);
+ timerService.registerProcessingTimeTimer("ciao", 20);
+ timerService.registerProcessingTimeTimer("ciao", 30);
+ timerService.registerProcessingTimeTimer("hello", 10);
+ timerService.registerProcessingTimeTimer("hello", 20);
+
+ assertEquals(5, timerService.numProcessingTimeTimers());
+ assertEquals(2, timerService.numProcessingTimeTimers("hello"));
+ assertEquals(3, timerService.numProcessingTimeTimers("ciao"));
+
+ assertEquals(1, processingTimeService.getNumRegisteredTimers());
+ assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L));
+
+ processingTimeService.setCurrentTime(10);
+
+ assertEquals(3, timerService.numProcessingTimeTimers());
+ assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+ assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
+
+ assertEquals(1, processingTimeService.getNumRegisteredTimers());
+ assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L));
+
+ processingTimeService.setCurrentTime(20);
+
+ assertEquals(1, timerService.numProcessingTimeTimers());
+ assertEquals(0, timerService.numProcessingTimeTimers("hello"));
+ assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
+
+ assertEquals(1, processingTimeService.getNumRegisteredTimers());
+ assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(30L));
+
+ processingTimeService.setCurrentTime(30);
+
+ assertEquals(0, timerService.numProcessingTimeTimers());
+
+ assertEquals(0, processingTimeService.getNumRegisteredTimers());
+
+ timerService.registerProcessingTimeTimer("ciao", 40);
+
+ assertEquals(1, processingTimeService.getNumRegisteredTimers());
+ }
+
+ /**
+ * Verify that registering a processing-time timer that is earlier than the existing timers
+ * removes the one physical timer and creates one for the earlier timestamp
+ * {@link ProcessingTimeService}.
+ */
+ @Test
+ public void testRegisterEarlierProcessingTimerMovesPhysicalProcessingTimer() throws Exception {
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+ TestKeyContext keyContext = new TestKeyContext();
+
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+
+ HeapInternalTimerService<Integer, String> timerService =
+ createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+ keyContext.setCurrentKey(0);
+
+ timerService.registerProcessingTimeTimer("ciao", 20);
+
+ assertEquals(1, timerService.numProcessingTimeTimers());
+
+ assertEquals(1, processingTimeService.getNumRegisteredTimers());
+ assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L));
+
+ timerService.registerProcessingTimeTimer("ciao", 10);
+
+ assertEquals(2, timerService.numProcessingTimeTimers());
+
+ assertEquals(1, processingTimeService.getNumRegisteredTimers());
+ assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L));
+ }
+
+ /**
+ */
+ @Test
+ public void testRegisteringProcessingTimeTimerInOnProcessingTimeDoesNotLeakPhysicalTimers() throws Exception {
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+ TestKeyContext keyContext = new TestKeyContext();
+
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+
+ final HeapInternalTimerService<Integer, String> timerService =
+ createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+ keyContext.setCurrentKey(0);
+
+ timerService.registerProcessingTimeTimer("ciao", 10);
+
+ assertEquals(1, timerService.numProcessingTimeTimers());
+
+ assertEquals(1, processingTimeService.getNumRegisteredTimers());
+ assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(10L));
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Exception {
+ timerService.registerProcessingTimeTimer("ciao", 20);
+ return null;
+ }
+ }).when(mockTriggerable).onProcessingTime(anyInternalTimer());
+
+ processingTimeService.setCurrentTime(10);
+
+ assertEquals(1, processingTimeService.getNumRegisteredTimers());
+ assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(20L));
+
+ doAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Exception {
+ timerService.registerProcessingTimeTimer("ciao", 30);
+ return null;
+ }
+ }).when(mockTriggerable).onProcessingTime(anyInternalTimer());
+
+ processingTimeService.setCurrentTime(20);
+
+ assertEquals(1, timerService.numProcessingTimeTimers());
+
+ assertEquals(1, processingTimeService.getNumRegisteredTimers());
+ assertThat(processingTimeService.getRegisteredTimerTimestamps(), containsInAnyOrder(30L));
+ }
+
+
+ @Test
+ public void testCurrentProcessingTime() throws Exception {
+
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+ TestKeyContext keyContext = new TestKeyContext();
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ HeapInternalTimerService<Integer, String> timerService =
+ createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+ processingTimeService.setCurrentTime(17L);
+ assertEquals(17, timerService.currentProcessingTime());
+
+ processingTimeService.setCurrentTime(42);
+ assertEquals(42, timerService.currentProcessingTime());
+ }
+
+ @Test
+ public void testCurrentEventTime() throws Exception {
+
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+ TestKeyContext keyContext = new TestKeyContext();
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ HeapInternalTimerService<Integer, String> timerService =
+ createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+ timerService.advanceWatermark(17);
+ assertEquals(17, timerService.currentWatermark());
+
+ timerService.advanceWatermark(42);
+ assertEquals(42, timerService.currentWatermark());
+ }
+
+ /**
+ * This also verifies that we don't have leakage between keys/namespaces.
+ */
+ @Test
+ public void testSetAndFireEventTimeTimers() throws Exception {
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+ TestKeyContext keyContext = new TestKeyContext();
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ HeapInternalTimerService<Integer, String> timerService =
+ createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+ keyContext.setCurrentKey(0);
+
+ timerService.registerEventTimeTimer("ciao", 10);
+ timerService.registerEventTimeTimer("hello", 10);
+
+ keyContext.setCurrentKey(1);
+
+ timerService.registerEventTimeTimer("ciao", 10);
+ timerService.registerEventTimeTimer("hello", 10);
+
+ assertEquals(4, timerService.numEventTimeTimers());
+ assertEquals(2, timerService.numEventTimeTimers("hello"));
+ assertEquals(2, timerService.numEventTimeTimers("ciao"));
+
+ timerService.advanceWatermark(10);
+
+ verify(mockTriggerable, times(4)).onEventTime(anyInternalTimer());
+ verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "ciao")));
+ verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "hello")));
+ verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao")));
+ verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "hello")));
+
+ assertEquals(0, timerService.numEventTimeTimers());
+ }
+
+ /**
+ * This also verifies that we don't have leakage between keys/namespaces.
+ */
+ @Test
+ public void testSetAndFireProcessingTimeTimers() throws Exception {
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+ TestKeyContext keyContext = new TestKeyContext();
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ HeapInternalTimerService<Integer, String> timerService =
+ createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+ keyContext.setCurrentKey(0);
+
+ timerService.registerProcessingTimeTimer("ciao", 10);
+ timerService.registerProcessingTimeTimer("hello", 10);
+
+ keyContext.setCurrentKey(1);
+
+ timerService.registerProcessingTimeTimer("ciao", 10);
+ timerService.registerProcessingTimeTimer("hello", 10);
+
+ assertEquals(4, timerService.numProcessingTimeTimers());
+ assertEquals(2, timerService.numProcessingTimeTimers("hello"));
+ assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
+
+ processingTimeService.setCurrentTime(10);
+
+ verify(mockTriggerable, times(4)).onProcessingTime(anyInternalTimer());
+ verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao")));
+ verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "hello")));
+ verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "ciao")));
+ verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello")));
+
+ assertEquals(0, timerService.numProcessingTimeTimers());
+ }
+
+ /**
+ * This also verifies that we don't have leakage between keys/namespaces.
+ *
+ * <p>This also verifies that deleted timers don't fire.
+ */
+ @Test
+ public void testDeleteEventTimeTimers() throws Exception {
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+ TestKeyContext keyContext = new TestKeyContext();
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ HeapInternalTimerService<Integer, String> timerService =
+ createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+ keyContext.setCurrentKey(0);
+
+ timerService.registerEventTimeTimer("ciao", 10);
+ timerService.registerEventTimeTimer("hello", 10);
+
+ keyContext.setCurrentKey(1);
+
+ timerService.registerEventTimeTimer("ciao", 10);
+ timerService.registerEventTimeTimer("hello", 10);
+
+ assertEquals(4, timerService.numEventTimeTimers());
+ assertEquals(2, timerService.numEventTimeTimers("hello"));
+ assertEquals(2, timerService.numEventTimeTimers("ciao"));
+
+ keyContext.setCurrentKey(0);
+ timerService.deleteEventTimeTimer("hello", 10);
+
+ keyContext.setCurrentKey(1);
+ timerService.deleteEventTimeTimer("ciao", 10);
+
+ assertEquals(2, timerService.numEventTimeTimers());
+ assertEquals(1, timerService.numEventTimeTimers("hello"));
+ assertEquals(1, timerService.numEventTimeTimers("ciao"));
+
+ timerService.advanceWatermark(10);
+
+ verify(mockTriggerable, times(2)).onEventTime(anyInternalTimer());
+ verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "ciao")));
+ verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, 0, "hello")));
+ verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao")));
+ verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "hello")));
+
+ assertEquals(0, timerService.numEventTimeTimers());
+ }
+
+ /**
+ * This also verifies that we don't have leakage between keys/namespaces.
+ *
+ * <p>This also verifies that deleted timers don't fire.
+ */
+ @Test
+ public void testDeleteProcessingTimeTimers() throws Exception {
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+ TestKeyContext keyContext = new TestKeyContext();
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ HeapInternalTimerService<Integer, String> timerService =
+ createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+ keyContext.setCurrentKey(0);
+
+ timerService.registerProcessingTimeTimer("ciao", 10);
+ timerService.registerProcessingTimeTimer("hello", 10);
+
+ keyContext.setCurrentKey(1);
+
+ timerService.registerProcessingTimeTimer("ciao", 10);
+ timerService.registerProcessingTimeTimer("hello", 10);
+
+ assertEquals(4, timerService.numProcessingTimeTimers());
+ assertEquals(2, timerService.numProcessingTimeTimers("hello"));
+ assertEquals(2, timerService.numProcessingTimeTimers("ciao"));
+
+ keyContext.setCurrentKey(0);
+ timerService.deleteProcessingTimeTimer("hello", 10);
+
+ keyContext.setCurrentKey(1);
+ timerService.deleteProcessingTimeTimer("ciao", 10);
+
+ assertEquals(2, timerService.numProcessingTimeTimers());
+ assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+ assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
+
+ processingTimeService.setCurrentTime(10);
+
+ verify(mockTriggerable, times(2)).onProcessingTime(anyInternalTimer());
+ verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao")));
+ verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, 0, "hello")));
+ verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, 1, "ciao")));
+ verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello")));
+
+ assertEquals(0, timerService.numEventTimeTimers());
+ }
+
+ @Test
+ public void testSnapshotAndRestore() throws Exception {
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+ TestKeyContext keyContext = new TestKeyContext();
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ HeapInternalTimerService<Integer, String> timerService =
+ createTimerService(mockTriggerable, keyContext, processingTimeService);
+
+ keyContext.setCurrentKey(0);
+
+ timerService.registerProcessingTimeTimer("ciao", 10);
+ timerService.registerEventTimeTimer("hello", 10);
+
+ keyContext.setCurrentKey(1);
+
+ timerService.registerEventTimeTimer("ciao", 10);
+ timerService.registerProcessingTimeTimer("hello", 10);
+
+ assertEquals(2, timerService.numProcessingTimeTimers());
+ assertEquals(1, timerService.numProcessingTimeTimers("hello"));
+ assertEquals(1, timerService.numProcessingTimeTimers("ciao"));
+ assertEquals(2, timerService.numEventTimeTimers());
+ assertEquals(1, timerService.numEventTimeTimers("hello"));
+ assertEquals(1, timerService.numEventTimeTimers("ciao"));
+
+ ByteArrayOutputStream outStream = new ByteArrayOutputStream();
+ timerService.snapshotTimers(outStream);
+ outStream.close();
+
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable2 = mock(Triggerable.class);
+
+ keyContext = new TestKeyContext();
+ processingTimeService = new TestProcessingTimeService();
+
+ timerService = restoreTimerService(
+ new ByteArrayInputStream(outStream.toByteArray()),
+ mockTriggerable2,
+ keyContext,
+ processingTimeService);
+
+ processingTimeService.setCurrentTime(10);
+ timerService.advanceWatermark(10);
+
+ verify(mockTriggerable2, times(2)).onProcessingTime(anyInternalTimer());
+ verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 0, "ciao")));
+ verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, 1, "hello")));
+ verify(mockTriggerable2, times(2)).onEventTime(anyInternalTimer());
+ verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, 0, "hello")));
+ verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, 1, "ciao")));
+
+ assertEquals(0, timerService.numEventTimeTimers());
+ }
+
+
+ private static class TestKeyContext implements KeyContext {
+
+ private Object key;
+
+ @Override
+ public void setCurrentKey(Object key) {
+ this.key = key;
+ }
+
+ @Override
+ public Object getCurrentKey() {
+ return key;
+ }
+ }
+
+ private static HeapInternalTimerService<Integer, String> createTimerService(
+ Triggerable<Integer, String> triggerable,
+ KeyContext keyContext,
+ ProcessingTimeService processingTimeService) {
+ return new HeapInternalTimerService<>(
+ IntSerializer.INSTANCE,
+ StringSerializer.INSTANCE,
+ triggerable,
+ keyContext,
+ processingTimeService);
+ }
+
+ private static HeapInternalTimerService<Integer, String> restoreTimerService(
+ InputStream stateStream,
+ Triggerable<Integer, String> triggerable,
+ KeyContext keyContext,
+ ProcessingTimeService processingTimeService) throws Exception {
+ HeapInternalTimerService.RestoredTimers<Integer, String> restoredTimers =
+ new HeapInternalTimerService.RestoredTimers<>(
+ stateStream,
+ HeapInternalTimerServiceTest.class.getClassLoader());
+
+ return new HeapInternalTimerService<>(
+ IntSerializer.INSTANCE,
+ StringSerializer.INSTANCE,
+ triggerable,
+ keyContext,
+ processingTimeService,
+ restoredTimers);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/81b19e53/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
new file mode 100644
index 0000000..6edf20a
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link StreamTimelyFlatMap}.
+ */
+public class TimelyFlatMapTest extends TestLogger {
+
+ @Test
+ public void testCurrentEventTime() throws Exception {
+
+ StreamTimelyFlatMap<Integer, Integer, String> operator =
+ new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark(new Watermark(17));
+ testHarness.processElement(new StreamRecord<>(5, 12L));
+
+ testHarness.processWatermark(new Watermark(42));
+ testHarness.processElement(new StreamRecord<>(6, 13L));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(17L));
+ expectedOutput.add(new StreamRecord<>("5TIME:17", 12L));
+ expectedOutput.add(new Watermark(42L));
+ expectedOutput.add(new StreamRecord<>("6TIME:42", 13L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testCurrentProcessingTime() throws Exception {
+
+ StreamTimelyFlatMap<Integer, Integer, String> operator =
+ new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(17);
+ testHarness.processElement(new StreamRecord<>(5));
+
+ testHarness.setProcessingTime(42);
+ testHarness.processElement(new StreamRecord<>(6));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("5TIME:17"));
+ expectedOutput.add(new StreamRecord<>("6TIME:42"));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testEventTimeTimers() throws Exception {
+
+ StreamTimelyFlatMap<Integer, Integer, Integer> operator =
+ new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark(new Watermark(0));
+
+ testHarness.processElement(new StreamRecord<>(17, 42L));
+
+ testHarness.processWatermark(new Watermark(5));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(0L));
+ expectedOutput.add(new StreamRecord<>(17, 42L));
+ expectedOutput.add(new StreamRecord<>(1777, 5L));
+ expectedOutput.add(new Watermark(5L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testProcessingTimeTimers() throws Exception {
+
+ StreamTimelyFlatMap<Integer, Integer, Integer> operator =
+ new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(17));
+
+ testHarness.setProcessingTime(5);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>(17));
+ expectedOutput.add(new StreamRecord<>(1777, 5L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ /**
+ * Verifies that we don't have leakage between different keys.
+ */
+ @Test
+ public void testEventTimeTimerWithState() throws Exception {
+
+ StreamTimelyFlatMap<Integer, Integer, String> operator =
+ new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processWatermark(new Watermark(1));
+ testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
+
+ testHarness.processWatermark(new Watermark(2));
+ testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
+
+ testHarness.processWatermark(new Watermark(6));
+ testHarness.processWatermark(new Watermark(7));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new Watermark(1L));
+ expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
+ expectedOutput.add(new Watermark(2L));
+ expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
+ expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+ expectedOutput.add(new Watermark(6L));
+ expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+ expectedOutput.add(new Watermark(7L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ /**
+ * Verifies that we don't have leakage between different keys.
+ */
+ @Test
+ public void testProcessingTimeTimerWithState() throws Exception {
+
+ StreamTimelyFlatMap<Integer, Integer, String> operator =
+ new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.setProcessingTime(1);
+ testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
+
+ testHarness.setProcessingTime(2);
+ testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
+
+ testHarness.setProcessingTime(6);
+ testHarness.setProcessingTime(7);
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("INPUT:17"));
+ expectedOutput.add(new StreamRecord<>("INPUT:42"));
+ expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+ expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ @Test
+ public void testSnapshotAndRestore() throws Exception {
+
+ StreamTimelyFlatMap<Integer, Integer, String> operator =
+ new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+
+ OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(5, 12L));
+
+ // snapshot and restore from scratch
+ StreamStateHandle snapshot = testHarness.snapshotLegacy(0, 0);
+
+ testHarness.close();
+
+ operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+
+ testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.setup();
+ testHarness.restore(snapshot);
+ testHarness.open();
+
+ testHarness.setProcessingTime(5);
+ testHarness.processWatermark(new Watermark(6));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+ expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+ expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+ expectedOutput.add(new Watermark(6));
+
+ System.out.println("GOT: " + testHarness.getOutput());
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.close();
+ }
+
+ private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public T getKey(T value) throws Exception {
+ return value;
+ }
+ }
+
+ private static class QueryingFlatMapFunction implements TimelyFlatMapFunction<Integer, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TimeDomain timeDomain;
+
+ public QueryingFlatMapFunction(TimeDomain timeDomain) {
+ this.timeDomain = timeDomain;
+ }
+
+ @Override
+ public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception {
+ if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+ out.collect(value + "TIME:" + timerService.currentWatermark());
+ } else {
+ out.collect(value + "TIME:" + timerService.currentProcessingTime());
+ }
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ TimeDomain timeDomain,
+ TimerService timerService,
+ Collector<String> out) throws Exception {
+ }
+ }
+
+ private static class TriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, Integer> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TimeDomain timeDomain;
+
+ public TriggeringFlatMapFunction(TimeDomain timeDomain) {
+ this.timeDomain = timeDomain;
+ }
+
+ @Override
+ public void flatMap(Integer value, TimerService timerService, Collector<Integer> out) throws Exception {
+ out.collect(value);
+ if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+ timerService.registerEventTimeTimer(timerService.currentWatermark() + 5);
+ } else {
+ timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5);
+ }
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ TimeDomain timeDomain,
+ TimerService timerService,
+ Collector<Integer> out) throws Exception {
+
+ assertEquals(this.timeDomain, timeDomain);
+ out.collect(1777);
+ }
+ }
+
+ private static class TriggeringStatefulFlatMapFunction extends RichTimelyFlatMapFunction<Integer, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final ValueStateDescriptor<Integer> state =
+ new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null);
+
+ private final TimeDomain timeDomain;
+
+ public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
+ this.timeDomain = timeDomain;
+ }
+
+ @Override
+ public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception {
+ out.collect("INPUT:" + value);
+ getRuntimeContext().getState(state).update(value);
+ if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+ timerService.registerEventTimeTimer(timerService.currentWatermark() + 5);
+ } else {
+ timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5);
+ }
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ TimeDomain timeDomain,
+ TimerService timerService,
+ Collector<String> out) throws Exception {
+ assertEquals(this.timeDomain, timeDomain);
+ out.collect("STATE:" + getRuntimeContext().getState(state).value());
+ }
+ }
+
+ private static class BothTriggeringFlatMapFunction implements TimelyFlatMapFunction<Integer, String> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(Integer value, TimerService timerService, Collector<String> out) throws Exception {
+ timerService.registerProcessingTimeTimer(5);
+ timerService.registerEventTimeTimer(6);
+
+ }
+
+ @Override
+ public void onTimer(
+ long timestamp,
+ TimeDomain timeDomain,
+ TimerService timerService,
+ Collector<String> out) throws Exception {
+ if (TimeDomain.EVENT_TIME.equals(timeDomain)) {
+ out.collect("EVENT:1777");
+ } else {
+ out.collect("PROC:1777");
+ }
+ }
+ }
+
+}