You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/18 07:13:44 UTC
[05/11] flink git commit: [FLINK-4460] Provide late-data output for
window operations
[FLINK-4460] Provide late-data output for window operations
We use side outputs to emit dropped late data.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/07a15d0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/07a15d0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/07a15d0e
Branch: refs/heads/master
Commit: 07a15d0e1647c79ae010ca6df5b1830a4087dd56
Parents: e134d27
Author: Chen Qin <qi...@gmail.com>
Authored: Wed Mar 1 15:36:17 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Sat Mar 18 07:44:17 2017 +0100
----------------------------------------------------------------------
.../api/datastream/AllWindowedStream.java | 79 ++++++--
.../api/datastream/WindowedStream.java | 75 +++++--
.../windowing/EvictingWindowOperator.java | 10 +-
.../operators/windowing/WindowOperator.java | 54 ++++-
.../windowing/EvictingWindowOperatorTest.java | 27 ++-
.../windowing/WindowOperatorContractTest.java | 3 +-
.../windowing/WindowOperatorMigrationTest.java | 20 +-
.../operators/windowing/WindowOperatorTest.java | 202 +++++++++++++------
.../streaming/api/scala/AllWindowedStream.scala | 16 +-
.../streaming/api/scala/WindowedStream.scala | 18 +-
.../streaming/runtime/SideOutputITCase.java | 173 ++++++++++++++++
11 files changed, 553 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index a45cb0a..50f0f85 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -63,6 +63,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.functions.Internal
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -106,6 +108,12 @@ public class AllWindowedStream<T, W extends Window> {
/** The user-specified allowed lateness. */
private long allowedLateness = 0L;
+ /**
+ * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
+ * dropped.
+ */
+ private OutputTag<T> lateDataOutputTag;
+
@PublicEvolving
public AllWindowedStream(DataStream<T> input,
WindowAssigner<? super T, W> windowAssigner) {
@@ -144,6 +152,23 @@ public class AllWindowedStream<T, W extends Window> {
}
/**
+ * Send late arriving data to the side output identified by the given {@link OutputTag}. Data
+ * is considered late after the watermark has passed the end of the window plus the allowed
+ * lateness set using {@link #allowedLateness(Time)}.
+ *
+ * <p>You can get the stream of late data using
+ * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the
+ * {@link SingleOutputStreamOperator} resulting from the windowed operation
+ * with the same {@link OutputTag}.
+ */
+ @PublicEvolving
+ public AllWindowedStream<T, W> sideOutputLateData(OutputTag<T> outputTag) {
+ Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
+ this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
+ return this;
+ }
+
+ /**
* Sets the {@code Evictor} that should be used to evict elements from a window before emission.
*
* <p>
@@ -271,7 +296,8 @@ public class AllWindowedStream<T, W extends Window> {
new InternalIterableAllWindowFunction<>(new ReduceApplyAllWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -288,7 +314,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueAllWindowFunction<>(function),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
@@ -367,7 +394,8 @@ public class AllWindowedStream<T, W extends Window> {
new InternalIterableProcessAllWindowFunction<>(new ReduceApplyProcessAllWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -384,7 +412,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueProcessAllWindowFunction<>(function),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
@@ -562,7 +591,8 @@ public class AllWindowedStream<T, W extends Window> {
new AggregateApplyAllWindowFunction<>(aggregateFunction, windowFunction)),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>(
@@ -580,7 +610,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueAllWindowFunction<>(windowFunction),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
@@ -693,7 +724,8 @@ public class AllWindowedStream<T, W extends Window> {
new InternalAggregateProcessAllWindowFunction<>(aggregateFunction, windowFunction),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>(
@@ -711,7 +743,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueProcessAllWindowFunction<>(windowFunction),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
@@ -842,7 +875,8 @@ public class AllWindowedStream<T, W extends Window> {
new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -858,7 +892,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueAllWindowFunction<>(function),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
@@ -948,7 +983,8 @@ public class AllWindowedStream<T, W extends Window> {
new InternalIterableProcessAllWindowFunction<>(new FoldApplyProcessAllWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -964,7 +1000,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueProcessAllWindowFunction<>(function),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
@@ -1080,7 +1117,8 @@ public class AllWindowedStream<T, W extends Window> {
function,
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
@@ -1096,7 +1134,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
function,
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
@@ -1177,7 +1216,8 @@ public class AllWindowedStream<T, W extends Window> {
new InternalIterableAllWindowFunction<>(new ReduceApplyAllWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -1194,7 +1234,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueAllWindowFunction<>(function),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
@@ -1280,7 +1321,8 @@ public class AllWindowedStream<T, W extends Window> {
new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function, resultType)),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -1296,7 +1338,8 @@ public class AllWindowedStream<T, W extends Window> {
stateDesc,
new InternalSingleValueAllWindowFunction<>(function),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator).forceNonParallel();
http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 164e47e..334851e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -73,6 +73,8 @@ import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -120,6 +122,12 @@ public class WindowedStream<T, K, W extends Window> {
/** The user-specified allowed lateness. */
private long allowedLateness = 0L;
+ /**
+ * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
+ * dropped.
+ */
+ private OutputTag<T> lateDataOutputTag;
+
@PublicEvolving
public WindowedStream(KeyedStream<T, K> input,
WindowAssigner<? super T, W> windowAssigner) {
@@ -162,6 +170,23 @@ public class WindowedStream<T, K, W extends Window> {
}
/**
+ * Send late arriving data to the side output identified by the given {@link OutputTag}. Data
+ * is considered late after the watermark has passed the end of the window plus the allowed
+ * lateness set using {@link #allowedLateness(Time)}.
+ *
+ * <p>You can get the stream of late data using
+ * {@link SingleOutputStreamOperator#getSideOutput(OutputTag)} on the
+ * {@link SingleOutputStreamOperator} resulting from the windowed operation
+ * with the same {@link OutputTag}.
+ */
+ @PublicEvolving
+ public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
+ Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
+ this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
+ return this;
+ }
+
+ /**
* Sets the {@code Evictor} that should be used to evict elements from a window before emission.
*
* <p>
@@ -344,7 +369,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -362,6 +388,7 @@ public class WindowedStream<T, K, W extends Window> {
new InternalSingleValueWindowFunction<>(function),
trigger,
allowedLateness,
+ lateDataOutputTag,
legacyWindowOpType);
}
@@ -437,7 +464,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalIterableProcessWindowFunction<>(new ReduceApplyProcessWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -454,7 +482,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueProcessWindowFunction<>(function),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
@@ -589,7 +618,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -604,7 +634,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueWindowFunction<>(function),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
@@ -697,7 +728,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalIterableProcessWindowFunction<>(new FoldApplyProcessWindowFunction<>(initialValue, foldFunction, windowFunction, foldResultType)),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
FoldingStateDescriptor<T, ACC> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -715,7 +747,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueProcessWindowFunction<>(windowFunction),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, windowResultType, operator);
@@ -890,7 +923,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalIterableWindowFunction<>(new AggregateApplyWindowFunction<>(aggregateFunction, windowFunction)),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>("window-contents",
@@ -905,7 +939,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueWindowFunction<>(windowFunction),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
@@ -1017,7 +1052,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalAggregateProcessWindowFunction<>(aggregateFunction, windowFunction),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
AggregatingStateDescriptor<T, ACC, V> stateDesc = new AggregatingStateDescriptor<>("window-contents",
@@ -1032,7 +1068,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueProcessWindowFunction<>(windowFunction),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
@@ -1154,7 +1191,8 @@ public class WindowedStream<T, K, W extends Window> {
function,
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
@@ -1171,6 +1209,7 @@ public class WindowedStream<T, K, W extends Window> {
function,
trigger,
allowedLateness,
+ lateDataOutputTag,
legacyWindowOpType);
}
@@ -1252,7 +1291,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
@@ -1269,7 +1309,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueWindowFunction<>(function),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
@@ -1354,7 +1395,8 @@ public class WindowedStream<T, K, W extends Window> {
new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function, resultType)),
trigger,
evictor,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
} else {
FoldingStateDescriptor<T, R> stateDesc = new FoldingStateDescriptor<>("window-contents",
@@ -1369,7 +1411,8 @@ public class WindowedStream<T, K, W extends Window> {
stateDesc,
new InternalSingleValueWindowFunction<>(function),
trigger,
- allowedLateness);
+ allowedLateness,
+ lateDataOutputTag);
}
return input.transform(opName, resultType, operator);
http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 45fea14..8dfc717 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
import java.util.Collection;
@@ -86,10 +87,11 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
InternalWindowFunction<Iterable<IN>, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger,
Evictor<? super IN, ? super W> evictor,
- long allowedLateness) {
+ long allowedLateness,
+ OutputTag<IN> lateDataOutputTag) {
super(windowAssigner, windowSerializer, keySelector,
- keySerializer, null, windowFunction, trigger, allowedLateness);
+ keySerializer, null, windowFunction, trigger, allowedLateness, lateDataOutputTag);
this.evictor = checkNotNull(evictor);
this.evictingWindowStateDescriptor = checkNotNull(windowStateDescriptor);
@@ -137,7 +139,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
});
// check if the window is already inactive
- if (isLate(actualWindow)) {
+ if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
@@ -177,7 +179,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window>
for (W window : elementWindows) {
// check if the window is already inactive
- if (isLate(window)) {
+ if (isWindowLate(window)) {
continue;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 3c4f397..9ce1ae7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -29,6 +29,7 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
@@ -132,6 +133,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
*/
private final long allowedLateness;
+ /**
+ * {@link OutputTag} to use for late arriving events. Elements for which
+ * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
+ * be emitted to this.
+ */
+ private final OutputTag<IN> lateDataOutputTag;
+
// ------------------------------------------------------------------------
// State that is not checkpointed
// ------------------------------------------------------------------------
@@ -200,10 +208,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
InternalWindowFunction<ACC, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger,
- long allowedLateness) {
+ long allowedLateness,
+ OutputTag<IN> lateDataOutputTag) {
this(windowAssigner, windowSerializer, keySelector, keySerializer,
- windowStateDescriptor, windowFunction, trigger, allowedLateness, LegacyWindowOperatorType.NONE);
+ windowStateDescriptor, windowFunction, trigger, allowedLateness, lateDataOutputTag, LegacyWindowOperatorType.NONE);
}
/**
@@ -218,6 +227,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
InternalWindowFunction<ACC, OUT, K, W> windowFunction,
Trigger<? super IN, ? super W> trigger,
long allowedLateness,
+ OutputTag<IN> lateDataOutputTag,
LegacyWindowOperatorType legacyWindowOperatorType) {
super(windowFunction);
@@ -239,6 +249,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
this.windowStateDescriptor = windowStateDescriptor;
this.trigger = checkNotNull(trigger);
this.allowedLateness = allowedLateness;
+ this.lateDataOutputTag = lateDataOutputTag;
this.legacyWindowOperatorType = legacyWindowOperatorType;
setChainingStrategy(ChainingStrategy.ALWAYS);
@@ -323,6 +334,9 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
+ //if element is handled by none of assigned elementWindows
+ boolean isSkippedElement = true;
+
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
@@ -355,10 +369,11 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
});
// drop if the window is already late
- if (isLate(actualWindow)) {
+ if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
+ isSkippedElement = false;
W stateWindow = mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
@@ -393,9 +408,10 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
for (W window: elementWindows) {
// drop if the window is already late
- if (isLate(window)) {
+ if (isWindowLate(window)) {
continue;
}
+ isSkippedElement = false;
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
@@ -419,6 +435,14 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
registerCleanupTimer(window);
}
}
+
+ // side output input event if
+ // element not handled by any window
+ // late arriving tag has been set
+ // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
+ if (isSkippedElement && lateDataOutputTag != null && isElementLate(element)) {
+ sideOutput(element);
+ }
}
@Override
@@ -546,6 +570,15 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
}
/**
+ * Write skipped late arriving element to SideOutput
+ *
+ * @param element skipped late arriving element to side output
+ */
+ private void sideOutput(StreamRecord<IN> element){
+ output.collect(lateDataOutputTag, element);
+ }
+
+ /**
* Retrieves the {@link MergingWindowSet} for the currently active key.
* The caller must ensure that the correct key is set in the state backend.
*
@@ -562,11 +595,22 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
* Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness
* of the given window.
*/
- protected boolean isLate(W window) {
+ protected boolean isWindowLate(W window) {
return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}
/**
+ * Decide if a record is currently late, based on current watermark and allowed lateness.
+ *
+ * @param element The element to check
+ * @return The element for which should be considered when sideoutputs
+ */
+ protected boolean isElementLate(StreamRecord<IN> element){
+ return (windowAssigner.isEventTime()) &&
+ (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());
+ }
+
+ /**
* Registers a timer to cleanup the content of the window.
* @param window
* the window whose state to discard
http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
index 8da1d7c..e9d63de 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperatorTest.java
@@ -89,7 +89,8 @@ public class EvictingWindowOperatorTest {
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(TRIGGER_COUNT),
CountEvictor.of(WINDOW_SIZE,EVICT_AFTER),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
@@ -169,7 +170,8 @@ public class EvictingWindowOperatorTest {
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(TRIGGER_COUNT),
TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
@@ -243,7 +245,8 @@ public class EvictingWindowOperatorTest {
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
CountTrigger.of(TRIGGER_COUNT),
TimeEvictor.of(Time.seconds(2)),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
@@ -319,7 +322,8 @@ public class EvictingWindowOperatorTest {
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(TRIGGER_COUNT),
TimeEvictor.of(Time.seconds(2), EVICT_AFTER),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
@@ -398,7 +402,8 @@ public class EvictingWindowOperatorTest {
return newDataPoint.f1 - oldDataPoint.f1;
}
}, EVICT_AFTER),
- 0);
+ 0,
+ null /* late data output tag */);
@@ -475,7 +480,8 @@ public class EvictingWindowOperatorTest {
return newDataPoint.f1 - oldDataPoint.f1;
}
}, EVICT_AFTER),
- 0);
+ 0,
+ null /* late data output tag */);
@@ -543,7 +549,8 @@ public class EvictingWindowOperatorTest {
new InternalIterableWindowFunction<>(new ReduceIterableWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer())),
CountTrigger.of(WINDOW_SLIDE),
CountEvictor.of(WINDOW_SIZE),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
@@ -615,7 +622,8 @@ public class EvictingWindowOperatorTest {
new InternalIterableWindowFunction<>(new RichSumReducer<GlobalWindow>(closeCalled)),
CountTrigger.of(WINDOW_SLIDE),
CountEvictor.of(WINDOW_SIZE),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
@@ -686,7 +694,8 @@ public class EvictingWindowOperatorTest {
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>(closeCalled)),
EventTimeTrigger.create(),
CountEvictor.of(WINDOW_SIZE),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index 7c4d711..f70990f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -2322,7 +2322,8 @@ public class WindowOperatorContractTest extends TestLogger {
stateDescriptor,
windowFunction,
trigger,
- allowedLatenss);
+ allowedLatenss,
+ null /*late data output tag */);
return new KeyedOneInputStreamOperatorTestHarness<>(
operator,
http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
index 7a356cf..19fa04f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorMigrationTest.java
@@ -100,7 +100,8 @@ public class WindowOperatorMigrationTest {
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
PurgingTrigger.of(CountTrigger.of(4)),
- 0);
+ 0,
+ null /* late data output tag */);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -181,7 +182,8 @@ public class WindowOperatorMigrationTest {
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
PurgingTrigger.of(CountTrigger.of(4)),
- 0);
+ 0,
+ null /* late data output tag */);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -258,7 +260,8 @@ public class WindowOperatorMigrationTest {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
- 0);
+ 0,
+ null /* late data output tag */);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -346,7 +349,8 @@ public class WindowOperatorMigrationTest {
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
EventTimeTrigger.create(),
- 0);
+ 0,
+ null /* late data output tag */);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -435,7 +439,8 @@ public class WindowOperatorMigrationTest {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(),
- 0);
+ 0,
+ null /* late data output tag */);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -510,7 +515,8 @@ public class WindowOperatorMigrationTest {
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
ProcessingTimeTrigger.create(),
- 0);
+ 0,
+ null /* late data output tag */);
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -633,6 +639,7 @@ public class WindowOperatorMigrationTest {
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(),
0,
+ null /* late data output tag */,
LegacyWindowOperatorType.FAST_AGGREGATING);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
@@ -735,6 +742,7 @@ public class WindowOperatorMigrationTest {
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
ProcessingTimeTrigger.create(),
0,
+ null /* late data output tag */,
LegacyWindowOperatorType.FAST_ACCUMULATING);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index a9c3ef6..b38cb2e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -92,6 +93,9 @@ public class WindowOperatorTest extends TestLogger {
// For counting if close() is called the correct number of times on the SumReducer
private static AtomicInteger closeCalled = new AtomicInteger(0);
+ // late arriving event OutputTag<StreamRecord<IN>>
+ private static final OutputTag<Tuple2<String, Integer>> lateOutputTag = new OutputTag<Tuple2<String, Integer>>("late-output") {};
+
private void testSlidingEventTimeWindows(OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness) throws Exception {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
@@ -181,7 +185,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -215,7 +220,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
EventTimeTrigger.create(),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -314,7 +320,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -346,7 +353,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalIterableWindowFunction<>(new RichSumReducer<TimeWindow>()),
EventTimeTrigger.create(),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -381,7 +389,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
EventTimeTrigger.create(),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -453,7 +462,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalIterableProcessWindowFunction<>(new SessionProcessWindowFunction()),
EventTimeTrigger.create(),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -525,7 +535,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -595,7 +606,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueProcessWindowFunction<>(new ReducedProcessSessionWindowFunction()),
EventTimeTrigger.create(),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -669,7 +681,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
PurgingTrigger.of(CountTrigger.of(4)),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -739,7 +752,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
ContinuousEventTimeTrigger.of(Time.seconds(2)),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -843,7 +857,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalIterableWindowFunction<>(new SessionWindowFunction()),
EventTimeTrigger.create(),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -901,7 +916,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
ContinuousEventTimeTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -989,7 +1005,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
- 0);
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1030,7 +1047,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>()),
PurgingTrigger.of(CountTrigger.of(WINDOW_SIZE)),
- 0);
+ 0,
+ null /* late data output tag */);
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1075,7 +1093,9 @@ public class WindowOperatorTest extends TestLogger {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
- ProcessingTimeTrigger.create(), 0);
+ ProcessingTimeTrigger.create(),
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1132,7 +1152,9 @@ public class WindowOperatorTest extends TestLogger {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
- ProcessingTimeTrigger.create(), 0);
+ ProcessingTimeTrigger.create(),
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1202,7 +1224,9 @@ public class WindowOperatorTest extends TestLogger {
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
- ProcessingTimeTrigger.create(), 0);
+ ProcessingTimeTrigger.create(),
+ 0,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1267,7 +1291,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
PurgingTrigger.of(EventTimeTrigger.create()),
- LATENESS);
+ LATENESS,
+ lateOutputTag);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1275,6 +1300,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+ ConcurrentLinkedQueue<Object> lateExpected = new ConcurrentLinkedQueue<>();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 500));
testHarness.processWatermark(new Watermark(1500));
@@ -1287,7 +1313,7 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), 1999));
expected.add(new Watermark(2300));
- // this will not be dropped because window.maxTimestamp() + allowedLateness > currentWatermark
+ // this will not be sideoutput because window.maxTimestamp() + allowedLateness > currentWatermark
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1997));
testHarness.processWatermark(new Watermark(6000));
@@ -1295,13 +1321,21 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1999));
expected.add(new Watermark(6000));
- // this will be dropped because window.maxTimestamp() + allowedLateness < currentWatermark
+ // this will be side output because window.maxTimestamp() + allowedLateness < currentWatermark
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
testHarness.processWatermark(new Watermark(7000));
+ lateExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
expected.add(new Watermark(7000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+
+ TestHarnessUtil.assertOutputEqualsSorted(
+ "SideOutput was not correct.",
+ lateExpected,
+ testHarness.getSideOutput(lateOutputTag),
+ new Tuple2ResultSortComparator());
+
testHarness.close();
}
@@ -1327,7 +1361,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
- LATENESS);
+ LATENESS,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1374,7 +1409,7 @@ public class WindowOperatorTest extends TestLogger {
}
@Test
- public void testDropDueToLatenessTumbling() throws Exception {
+ public void testSideOutputDueToLatenessTumbling() throws Exception {
final int WINDOW_SIZE = 2;
final long LATENESS = 0;
@@ -1393,7 +1428,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
- LATENESS);
+ LATENESS,
+ lateOutputTag);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1401,6 +1437,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+ ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
// normal element
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
@@ -1415,8 +1452,9 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple2<>("key2", 2), 1999));
expected.add(new Watermark(1999));
- // dropped as late
+ // sideoutput as late, will reuse previous timestamp since only input tuple is sideoutputed
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
+ sideExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 1998));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 2001));
testHarness.processWatermark(new Watermark(2999));
@@ -1429,11 +1467,13 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(3999));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+ TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, testHarness.getSideOutput(
+ lateOutputTag), new Tuple2ResultSortComparator());
testHarness.close();
}
@Test
- public void testDropDueToLatenessSliding() throws Exception {
+ public void testSideOutputDueToLatenessSliding() throws Exception {
final int WINDOW_SIZE = 3;
final int WINDOW_SLIDE = 1;
final long LATENESS = 0;
@@ -1453,7 +1493,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
- LATENESS);
+ LATENESS,
+ lateOutputTag /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1461,6 +1502,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+ ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(1999));
@@ -1497,19 +1539,21 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(6000));
- // dropped due to lateness
+ // sideoutput element due to lateness
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 1), 3001));
+ sideExpected.add(new StreamRecord<>(new Tuple2<>("key1", 1), 3001));
testHarness.processWatermark(new Watermark(25000));
expected.add(new Watermark(25000));
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, testHarness.getOutput(), new Tuple2ResultSortComparator());
+ TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, testHarness.getSideOutput(lateOutputTag), new Tuple2ResultSortComparator());
testHarness.close();
}
@Test
- public void testDropDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exception {
+ public void testSideOutputDueToLatenessSessionZeroLatenessPurgingTrigger() throws Exception {
final int GAP_SIZE = 3;
final long LATENESS = 0;
@@ -1528,7 +1572,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
PurgingTrigger.of(EventTimeTrigger.create()),
- LATENESS);
+ LATENESS,
+ lateOutputTag);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1536,6 +1581,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+ ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(1999));
@@ -1571,10 +1617,13 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
expected.add(new Watermark(14600));
- // this is dropped as late
+ // this is side output as late
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
- // this is also dropped as late (we test that they are not accidentally merged)
+ sideExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
+
+ // this is also side output as late (we test that they are not accidentally merged)
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10100));
+ sideExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 10100));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
testHarness.processWatermark(new Watermark(20000));
@@ -1587,19 +1636,16 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(100000));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+ ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
+ TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, sideActual, new Tuple2ResultSortComparator());
+
testHarness.close();
}
@Test
- public void testDropDueToLatenessSessionZeroLateness() throws Exception {
- // same as testDropDueToLatenessSessionZeroLateness() but with an accumulating trigger, i.e.
- // one that does not return FIRE_AND_PURGE when firing but just FIRE
-
- // this has the same output as testDropDueToLatenessSessionZeroLateness() because
- // accumulating/discarding does not make a difference with "allowed lateness" = 0.
-
+ public void testSideOutputDueToLatenessSessionZeroLateness() throws Exception {
final int GAP_SIZE = 3;
final long LATENESS = 0;
@@ -1618,7 +1664,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
- LATENESS);
+ LATENESS,
+ lateOutputTag);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1626,6 +1673,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
+ ConcurrentLinkedQueue<Object> sideExpected = new ConcurrentLinkedQueue<>();
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(1999));
@@ -1661,8 +1709,9 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 11600L, 14600L), 14599));
expected.add(new Watermark(14600));
- // this is dropped as late
+ // this is sideoutput as late, reuse last timestamp
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
+ sideExpected.add(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
testHarness.processWatermark(new Watermark(20000));
@@ -1674,14 +1723,16 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(100000));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+ ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple2ResultSortComparator());
+ TestHarnessUtil.assertOutputEqualsSorted("SideOutput was not correct.", sideExpected, sideActual, new Tuple2ResultSortComparator());
testHarness.close();
}
@Test
public void testDropDueToLatenessSessionWithLatenessPurgingTrigger() throws Exception {
- // this has the same output as testDropDueToLatenessSessionZeroLateness() because
+ // this has the same output as testSideOutputDueToLatenessSessionZeroLateness() because
// the allowed lateness is too small to make a difference
final int GAP_SIZE = 3;
@@ -1702,7 +1753,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
PurgingTrigger.of(EventTimeTrigger.create()),
- LATENESS);
+ LATENESS,
+ lateOutputTag);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1710,7 +1762,7 @@ public class WindowOperatorTest extends TestLogger {
testHarness.open();
ConcurrentLinkedQueue<Object> expected = new ConcurrentLinkedQueue<>();
-
+
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 1000));
testHarness.processWatermark(new Watermark(1999));
@@ -1759,13 +1811,14 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(100000));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
testHarness.close();
}
@Test
- public void testDropDueToLatenessSessionWithLateness() throws Exception {
- // same as testDropDueToLatenessSessionWithLateness() but with an accumulating trigger, i.e.
+ public void testNotSideOutputDueToLatenessSessionWithLateness() throws Exception {
+ // same as testSideOutputDueToLatenessSessionWithLateness() but with an accumulating trigger, i.e.
// one that does not return FIRE_AND_PURGE when firing but just FIRE. The expected
// results are therefore slightly different.
@@ -1787,7 +1840,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
- LATENESS);
+ LATENESS,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1806,7 +1860,7 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(4998));
- // this will not be dropped because the session we're adding two has maxTimestamp
+ // this will not be sideoutput because the session we're adding two has maxTimestamp
// after the current watermark
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
@@ -1832,7 +1886,7 @@ public class WindowOperatorTest extends TestLogger {
// because of the small allowed lateness and because the trigger is accumulating
// this will be merged into the session (11600-14600) and therefore will not
- // be dropped as late
+ // be sideoutput as late
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
@@ -1843,7 +1897,10 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-2", 10000L, 14600L), 14599));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+ ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
+
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
+ assertEquals(null, sideActual);
testHarness.processWatermark(new Watermark(20000));
@@ -1855,12 +1912,15 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(100000));
actual = testHarness.getOutput();
+ sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
+ assertEquals(null, sideActual);
+
testHarness.close();
}
@Test
- public void testDropDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws Exception {
+ public void testNotSideOutputDueToLatenessSessionWithHugeLatenessPurgingTrigger() throws Exception {
final int GAP_SIZE = 3;
final long LATENESS = 10000;
@@ -1880,7 +1940,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
PurgingTrigger.of(EventTimeTrigger.create()),
- LATENESS);
+ LATENESS,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1899,7 +1960,7 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(4998));
- // this will not be dropped because the session we're adding two has maxTimestamp
+ // this will not be sideoutput because the session we're adding two has maxTimestamp
// after the current watermark
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
@@ -1928,7 +1989,9 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-1", 1000L, 14600L), 14599));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+ ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
+ assertEquals(null, sideActual);
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
testHarness.processWatermark(new Watermark(20000));
@@ -1941,12 +2004,15 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(100000));
actual = testHarness.getOutput();
+ sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
+ assertEquals(null, sideActual);
+
testHarness.close();
}
@Test
- public void testDropDueToLatenessSessionWithHugeLateness() throws Exception {
+ public void testNotSideOutputDueToLatenessSessionWithHugeLateness() throws Exception {
final int GAP_SIZE = 3;
final long LATENESS = 10000;
@@ -1965,7 +2031,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
- LATENESS);
+ LATENESS,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -1984,7 +2051,7 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(4998));
- // this will not be dropped because the session we're adding two has maxTimestamp
+ // this will not be sideoutput because the session we're adding two has maxTimestamp
// after the current watermark
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 4500));
@@ -2015,7 +2082,9 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new StreamRecord<>(new Tuple3<>("key2-7", 1000L, 14600L), 14599));
ConcurrentLinkedQueue<Object> actual = testHarness.getOutput();
+ ConcurrentLinkedQueue<Object> sideActual = testHarness.getSideOutput(lateOutputTag);
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
+ assertEquals(null, sideActual);
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
testHarness.processWatermark(new Watermark(20000));
@@ -2027,7 +2096,11 @@ public class WindowOperatorTest extends TestLogger {
expected.add(new Watermark(100000));
actual = testHarness.getOutput();
+ sideActual = testHarness.getSideOutput(lateOutputTag);
+
TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expected, actual, new Tuple3ResultSortComparator());
+ assertEquals(null, sideActual);
+
testHarness.close();
}
@@ -2050,7 +2123,8 @@ public class WindowOperatorTest extends TestLogger {
windowStateDesc,
new InternalIterableWindowFunction<>(new PassThroughFunction2()),
new EventTimeTriggerAccumGC(LATENESS),
- LATENESS);
+ LATENESS,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -2104,7 +2178,8 @@ public class WindowOperatorTest extends TestLogger {
windowStateDesc,
new InternalIterableWindowFunction<>(new PassThroughFunction()),
EventTimeTrigger.create(),
- LATENESS);
+ LATENESS,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -2150,7 +2225,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction<String, TimeWindow, Tuple2<String, Integer>>()),
EventTimeTrigger.create(),
- LATENESS);
+ LATENESS,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -2207,7 +2283,8 @@ public class WindowOperatorTest extends TestLogger {
windowStateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughFunction()),
EventTimeTrigger.create(),
- LATENESS);
+ LATENESS,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -2252,7 +2329,8 @@ public class WindowOperatorTest extends TestLogger {
windowStateDesc,
new InternalIterableWindowFunction<>(new PassThroughFunction()),
EventTimeTrigger.create(),
- LATENESS);
+ LATENESS,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -2296,7 +2374,8 @@ public class WindowOperatorTest extends TestLogger {
stateDesc,
new InternalSingleValueWindowFunction<>(new ReducedSessionWindowFunction()),
EventTimeTrigger.create(),
- LATENESS);
+ LATENESS,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
@@ -2351,7 +2430,8 @@ public class WindowOperatorTest extends TestLogger {
windowStateDesc,
new InternalSingleValueWindowFunction<>(new PassThroughFunction()),
EventTimeTrigger.create(),
- LATENESS);
+ LATENESS,
+ null /* late data output tag */);
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple2<String, Integer>> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO);
http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 694353c..757e45f 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -30,7 +30,7 @@ import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
+import org.apache.flink.util.{Collector, OutputTag}
import org.apache.flink.util.Preconditions.checkNotNull
/**
@@ -72,6 +72,20 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
}
/**
+ * Send late arriving data to the side output identified by the given [[OutputTag]]. Data
+ * is considered late after the watermark has passed the end of the window plus the allowed
+ * lateness set using [[allowedLateness(Time)]].
+ *
+ * You can get the stream of late data using [[DataStream.getSideOutput()]] on the [[DataStream]]
+ * resulting from the windowed operation with the same [[OutputTag]].
+ */
+ @PublicEvolving
+ def sideOutputLateData(outputTag: OutputTag[T]): AllWindowedStream[T, W] = {
+ javaStream.sideOutputLateData(outputTag)
+ this
+ }
+
+ /**
* Sets the [[Trigger]] that should be used to trigger window emission.
*/
@PublicEvolving
http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 32a9f60..4e0e1a4 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -19,8 +19,6 @@
package org.apache.flink.streaming.api.scala
import org.apache.flink.annotation.{Public, PublicEvolving}
-import org.apache.flink.api.common.functions.{FoldFunction, ReduceFunction}
-import org.apache.flink.annotation.{PublicEvolving, Public}
import org.apache.flink.api.common.functions.{AggregateFunction, FoldFunction, ReduceFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
@@ -32,7 +30,7 @@ import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
-import org.apache.flink.util.Collector
+import org.apache.flink.util.{Collector, OutputTag}
/**
* A [[WindowedStream]] represents a data stream where elements are grouped by
@@ -76,6 +74,20 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
}
/**
+ * Send late arriving data to the side output identified by the given [[OutputTag]]. Data
+ * is considered late after the watermark has passed the end of the window plus the allowed
+ * lateness set using [[allowedLateness(Time)]].
+ *
+ * You can get the stream of late data using [[DataStream.getSideOutput()]] on the [[DataStream]]
+ * resulting from the windowed operation with the same [[OutputTag]].
+ */
+ @PublicEvolving
+ def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {
+ javaStream.sideOutputLateData(outputTag)
+ this
+ }
+
+ /**
* Sets the [[Trigger]] that should be used to trigger window emission.
*/
@PublicEvolving
http://git-wip-us.apache.org/repos/asf/flink/blob/07a15d0e/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
new file mode 100644
index 0000000..2f92897
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
+import org.apache.flink.util.Collector;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Integration test for streaming programs using side outputs.
+ */
+public class SideOutputITCase extends StreamingMultipleProgramsTestBase {
+
+ static List<Integer> elements = new ArrayList<>();
+ static {
+ elements.add(1);
+ elements.add(2);
+ elements.add(5);
+ elements.add(3);
+ elements.add(4);
+ }
+
+ private static class TestWatermarkAssigner implements AssignerWithPunctuatedWatermarks<Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Nullable
+ @Override
+ public Watermark checkAndGetNextWatermark(Integer lastElement, long extractedTimestamp) {
+ return new Watermark(extractedTimestamp);
+ }
+
+ @Override
+ public long extractTimestamp(Integer element, long previousElementTimestamp) {
+ return Long.valueOf(element);
+ }
+ }
+
+ private static class TestKeySelector implements KeySelector<Integer, Integer> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Integer getKey(Integer value) throws Exception {
+ return value;
+ }
+ }
+
+ /**
+ * Test window late arriving events stream
+ */
+ @Test
+ public void testAllWindowLateArrivingEvents() throws Exception {
+ TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+ see.setParallelism(1);
+ see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ DataStream<Integer> dataStream = see.fromCollection(elements);
+
+ OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late"){};
+
+ SingleOutputStreamOperator<Integer> windowOperator = dataStream
+ .assignTimestampsAndWatermarks(new TestWatermarkAssigner())
+ .timeWindowAll(Time.milliseconds(1), Time.milliseconds(1))
+ .sideOutputLateData(lateDataTag)
+ .apply(new AllWindowFunction<Integer, Integer, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(TimeWindow window, Iterable<Integer> values, Collector<Integer> out) throws Exception {
+ for(Integer val : values) {
+ out.collect(val);
+ }
+ }
+ });
+
+ windowOperator
+ .getSideOutput(lateDataTag)
+ .flatMap(new FlatMapFunction<Integer, String>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void flatMap(Integer value, Collector<String> out) throws Exception {
+ out.collect("late-" + String.valueOf(value));
+ }
+ })
+ .addSink(sideOutputResultSink);
+
+ see.execute();
+ assertEquals(sideOutputResultSink.getSortedResult(), Arrays.asList("late-3", "late-4"));
+
+ }
+
+ @Test
+ public void testKeyedWindowLateArrivingEvents() throws Exception {
+ TestListResultSink<String> resultSink = new TestListResultSink<>();
+ TestListResultSink<Integer> lateResultSink = new TestListResultSink<>();
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+ see.setParallelism(3);
+ see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ DataStream<Integer> dataStream = see.fromCollection(elements);
+
+ OutputTag<Integer> lateDataTag = new OutputTag<Integer>("late"){};
+
+ SingleOutputStreamOperator<String> windowOperator = dataStream
+ .assignTimestampsAndWatermarks(new TestWatermarkAssigner())
+ .keyBy(new TestKeySelector())
+ .timeWindow(Time.milliseconds(1), Time.milliseconds(1))
+ .allowedLateness(Time.milliseconds(2))
+ .sideOutputLateData(lateDataTag)
+ .apply(new WindowFunction<Integer, String, Integer, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(Integer key, TimeWindow window, Iterable<Integer> input, Collector<String> out) throws Exception {
+ for(Integer val : input) {
+ out.collect(String.valueOf(key) + "-" + String.valueOf(val));
+ }
+ }
+ });
+
+ windowOperator
+ .addSink(resultSink);
+
+ windowOperator
+ .getSideOutput(lateDataTag)
+ .addSink(lateResultSink);
+
+ see.execute();
+ assertEquals(Arrays.asList("1-1", "2-2", "4-4", "5-5"), resultSink.getSortedResult());
+ assertEquals(Collections.singletonList(3), lateResultSink.getSortedResult());
+ }
+
+}