You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/03/31 10:30:54 UTC
flink git commit: [FLINK-6205] [FLINK-6069] [cep] Correct
watermark/late events in side output.
Repository: flink
Updated Branches:
refs/heads/master 193224017 -> 48890285d
[FLINK-6205] [FLINK-6069] [cep] Correct watermark/late events in side output.
With this, the CEP library assumes correctness of the watermark
and considers as late, events that arrive having a timestamp
smaller than that of the last seen watermark. Late events are not
silently dropped, but the user can specify to send them to a side
output.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48890285
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48890285
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48890285
Branch: refs/heads/master
Commit: 48890285d4b1c285bebb971ae0dbfc310c6fcc0e
Parents: 1932240
Author: kl0u <kk...@gmail.com>
Authored: Thu Mar 23 19:01:15 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Fri Mar 31 11:16:50 2017 +0200
----------------------------------------------------------------------
docs/dev/libs/cep.md | 53 ++++++++++
.../apache/flink/cep/scala/PatternStream.scala | 37 ++++++-
.../org/apache/flink/cep/PatternStream.java | 72 +++++++++++--
.../AbstractKeyedCEPPatternOperator.java | 69 ++++++++++---
.../flink/cep/operator/CEPOperatorUtils.java | 17 ++-
.../cep/operator/KeyedCEPPatternOperator.java | 4 +-
.../TimeoutKeyedCEPPatternOperator.java | 4 +-
.../java/org/apache/flink/cep/CEPITCase.java | 103 +++++++++++++++++++
.../cep/operator/CEPMigration11to13Test.java | 2 +
.../cep/operator/CEPMigration12to13Test.java | 3 +
.../flink/cep/operator/CEPOperatorTest.java | 2 +
.../flink/cep/operator/CEPRescalingTest.java | 1 +
12 files changed, 333 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index bb704c7..643d6ee 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -777,6 +777,59 @@ DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect
</div>
</div>
+### Handling Lateness in Event Time
+
+In `CEP` the order in which elements are processed matters. To guarantee that elements are processed in the correct order
+when working in event time, an incoming element is initially put in a buffer where elements are *sorted in ascending
+order based on their timestamp*, and when a watermark arrives, all the elements in this buffer with timestamps smaller
+than that of the watermark are processed. This implies that elements between watermarks are processed in event-time order.
+
+<span class="label label-danger">Attention</span> The library assumes correctness of the watermark when working
+in event time.
+
+To also guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes
+*correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last
+seen watermark. Late elements are not further processed but they can be redirected to a [side output]
+({{ site.baseurl }}/dev/stream/side_output.html), dedicated to them.
+
+To access the stream of late elements, you first need to specify that you want to get the late data using
+`.withLateDataOutputTag(OutputTag)` on the `PatternStream` returned using the `CEP.pattern(...)` call. If you do not do
+so, the late elements will be silently dropped. Then, you can get the side-output stream using the
+`.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and providing as argument the output tag used in
+the `.withLateDataOutputTag(OutputTag)`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
+
+PatternStream<T> patternStream = CEP.pattern(...)
+ .withLateDataOutputTag(lateOutputTag);
+
+// main output with matches
+DataStream<O> result = patternStream.select(...)
+
+// side output containing the late events
+DataStream<T> lateStream = patternStream.getSideOutput(lateOutputTag);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val lateOutputTag = OutputTag[T]("late-data")
+
+val patternStream: PatternStream[T] = CEP.pattern(...)
+ .withLateDataOutputTag(lateOutputTag)
+
+// main output with matches
+val result = patternStream.select(...)
+
+// side output containing the late events
+val lateStream = patternStream.getSideOutput(lateOutputTag)
+{% endhighlight %}
+</div>
+</div>
+
## Examples
The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data stream of `Events`.
http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index 6207049..fb09c15 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -22,12 +22,13 @@ import java.util.{Map => JMap}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
import org.apache.flink.cep.pattern.{Pattern => JPattern}
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.scala.{asScalaStream, _}
+import org.apache.flink.util.{Collector, OutputTag}
import org.apache.flink.types.{Either => FEither}
import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
import java.lang.{Long => JLong}
+import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.cep.operator.CEPOperatorUtils
import org.apache.flink.cep.scala.pattern.Pattern
@@ -45,8 +46,23 @@ import scala.collection.mutable
*/
class PatternStream[T](jPatternStream: JPatternStream[T]) {
+ private[flink] var lateDataOutputTag: OutputTag[T] = null
+
private[flink] def wrappedPatternStream = jPatternStream
+
+ /**
+ * Send late arriving data to the side output identified by the given {@link OutputTag}. The
+ * CEP library assumes correctness of the watermark, so an element is considered late if its
+ * timestamp is smaller than the last received watermark.
+ */
+ @PublicEvolving
+ def withLateDataOutputTag(outputTag: OutputTag[T]): PatternStream[T] = {
+ jPatternStream.withLateDataOutputTag(outputTag)
+ lateDataOutputTag = outputTag
+ this
+ }
+
def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])
def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream())
@@ -93,7 +109,8 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
jPatternStream.getInputStream(),
- jPatternStream.getPattern())
+ jPatternStream.getPattern(),
+ lateDataOutputTag)
val cleanedSelect = cleanClosure(patternSelectFunction)
val cleanedTimeout = cleanClosure(patternTimeoutFunction)
@@ -158,7 +175,8 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
: DataStream[Either[L, R]] = {
val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
jPatternStream.getInputStream(),
- jPatternStream.getPattern()
+ jPatternStream.getPattern(),
+ lateDataOutputTag
)
val cleanedSelect = cleanClosure(patternFlatSelectFunction)
@@ -317,6 +335,17 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
flatSelect(patternFlatTimeoutFun, patternFlatSelectFun)
}
+
+ /**
+ * Gets the {@link DataStream} that contains the elements that are emitted from an operation
+ * into the side output with the given {@link OutputTag}.
+ *
+ * @param tag The tag identifying a specific side output.
+ */
+ @PublicEvolving
+ def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X] = {
+ asScalaStream(jPatternStream.getSideOutput(tag))
+ }
}
object PatternStream {
http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index efcd16c..87666a5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -27,8 +27,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.cep.operator.CEPOperatorUtils;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.types.Either;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
import java.util.Map;
@@ -50,6 +53,19 @@ public class PatternStream<T> {
private final Pattern<T, ?> pattern;
+ /**
+ * A reference to the created pattern stream used to get
+ * the registered side outputs, e.g late elements side output.
+ */
+ private SingleOutputStreamOperator<?> patternStream;
+
+ /**
+ * {@link OutputTag} to use for late arriving events. Elements for which
+ * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
+ * be emitted to this.
+ */
+ private OutputTag<T> lateDataOutputTag;
+
PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
this.inputStream = inputStream;
this.pattern = pattern;
@@ -64,6 +80,22 @@ public class PatternStream<T> {
}
/**
+ * Send late arriving data to the side output identified by the given {@link OutputTag}. The
+ * CEP library assumes correctness of the watermark, so an element is considered late if its
+ * timestamp is smaller than the last received watermark.
+ */
+ public PatternStream<T> withLateDataOutputTag(OutputTag<T> outputTag) {
+ Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
+ Preconditions.checkArgument(lateDataOutputTag == null,
+ "The late side output tag has already been initialized to " + lateDataOutputTag + ".");
+ Preconditions.checkArgument(patternStream == null,
+ "The late side output tag has to be set before calling select() or flatSelect().");
+
+ this.lateDataOutputTag = inputStream.getExecutionEnvironment().clean(outputTag);
+ return this;
+ }
+
+ /**
* Applies a select function to the detected pattern sequence. For each pattern sequence the
* provided {@link PatternSelectFunction} is called. The pattern select function can produce
* exactly one resulting element.
@@ -74,7 +106,7 @@ public class PatternStream<T> {
* @return {@link DataStream} which contains the resulting elements from the pattern select
* function.
*/
- public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction) {
+ public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction) {
// we have to extract the output type from the provided pattern selection function manually
// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
@@ -102,8 +134,10 @@ public class PatternStream<T> {
* @return {@link DataStream} which contains the resulting elements from the pattern select
* function.
*/
- public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
- DataStream<Map<String, T>> patternStream = CEPOperatorUtils.createPatternStream(inputStream, pattern);
+ public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
+ SingleOutputStreamOperator<Map<String, T>> patternStream =
+ CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
+ this.patternStream = patternStream;
return patternStream.map(
new PatternSelectMapper<>(
@@ -129,11 +163,13 @@ public class PatternStream<T> {
* @return {@link DataStream} which contains the resulting elements or the resulting timeout
* elements wrapped in an {@link Either} type.
*/
- public <L, R> DataStream<Either<L, R>> select(
+ public <L, R> SingleOutputStreamOperator<Either<L, R>> select(
final PatternTimeoutFunction<T, L> patternTimeoutFunction,
final PatternSelectFunction<T, R> patternSelectFunction) {
- DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream = CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
+ SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream =
+ CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag);
+ this.patternStream = patternStream;
TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternTimeoutFunction,
@@ -174,7 +210,7 @@ public class PatternStream<T> {
* @return {@link DataStream} which contains the resulting elements from the pattern flat select
* function.
*/
- public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
+ public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
// we have to extract the output type from the provided pattern selection function manually
// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
@@ -201,8 +237,10 @@ public class PatternStream<T> {
* @return {@link DataStream} which contains the resulting elements from the pattern flat select
* function.
*/
- public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
- DataStream<Map<String, T>> patternStream = CEPOperatorUtils.createPatternStream(inputStream, pattern);
+ public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
+ SingleOutputStreamOperator<Map<String, T>> patternStream =
+ CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
+ this.patternStream = patternStream;
return patternStream.flatMap(
new PatternFlatSelectMapper<>(
@@ -229,11 +267,13 @@ public class PatternStream<T> {
* function or the resulting timeout events from the pattern flat timeout function wrapped in an
* {@link Either} type.
*/
- public <L, R> DataStream<Either<L, R>> flatSelect(
+ public <L, R> SingleOutputStreamOperator<Either<L, R>> flatSelect(
final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
- DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream = CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
+ SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream =
+ CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag);
+ this.patternStream = patternStream;
TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternFlatTimeoutFunction,
@@ -264,6 +304,18 @@ public class PatternStream<T> {
}
/**
+ * Gets the {@link DataStream} that contains the elements that are emitted from an operation
+ * into the side output with the given {@link OutputTag}.
+ *
+ * @param sideOutputTag The tag identifying a specific side output.
+ */
+ public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
+ Preconditions.checkNotNull(patternStream, "The operator has not been initialized. " +
+ "To have the late element side output, you have to first define the main output using select() or flatSelect().");
+ return patternStream.getSideOutput(sideOutputTag);
+ }
+
+ /**
* Wrapper for a {@link PatternSelectFunction}.
*
* @param <T> Type of the input elements
http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 3e18660..b232dbb 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -40,6 +40,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -89,6 +90,19 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
private final NFACompiler.NFAFactory<IN> nfaFactory;
/**
+ * {@link OutputTag} to use for late arriving events. Elements for which
+ * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
+ * be emitted to this.
+ */
+ private final OutputTag<IN> lateDataOutputTag;
+
+ /**
+ * The last seen watermark. This will be used to
+ * decide if an incoming element is late or not.
+ */
+ private long lastWatermark;
+
+ /**
* A flag used in the case of migration that indicates if
* we are restoring from an old keyed or non-keyed operator.
*/
@@ -100,6 +114,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
final KeySelector<IN, KEY> keySelector,
final TypeSerializer<KEY> keySerializer,
final NFACompiler.NFAFactory<IN> nfaFactory,
+ final OutputTag<IN> lateDataOutputTag,
final boolean migratingFromOldKeyedOperator) {
this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
@@ -107,11 +122,9 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
this.keySelector = Preconditions.checkNotNull(keySelector);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
- this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator;
- }
- public TypeSerializer<IN> getInputSerializer() {
- return inputSerializer;
+ this.lateDataOutputTag = lateDataOutputTag;
+ this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator;
}
@Override
@@ -159,6 +172,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
// 3) advance the time to the current watermark, so that expired patterns are discarded.
// 4) update the stored state for the key, by only storing the new NFA and priority queue iff they
// have state to be used later.
+ // 5) update the last seen watermark.
// STEP 1
PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
@@ -180,6 +194,9 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
if (priorityQueue.isEmpty() && nfa.isEmpty()) {
watermarkCallbackService.unregisterKeyFromWatermarkCallback(key);
}
+
+ // STEP 5
+ updateLastSeenWatermark(watermark);
}
},
keySerializer
@@ -196,19 +213,45 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
updateNFA(nfa);
} else {
- getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
- PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+ // In event-time processing we assume correctness of the watermark.
+ // Events with timestamp smaller than the last seen watermark are considered late.
+ // Late events are put in a dedicated side output, if the user has specified one.
+
+ if (element.getTimestamp() >= lastWatermark) {
- // event time processing
- // we have to buffer the elements until we receive the proper watermark
- if (getExecutionConfig().isObjectReuseEnabled()) {
- // copy the StreamRecord so that it cannot be changed
- priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
+ // we have an event with a valid timestamp, so
+ // we buffer it until we receive the proper watermark.
+
+ getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
+
+ PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+ if (getExecutionConfig().isObjectReuseEnabled()) {
+ // copy the StreamRecord so that it cannot be changed
+ priorityQueue.offer(new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
+ } else {
+ priorityQueue.offer(element);
+ }
+ updatePriorityQueue(priorityQueue);
} else {
- priorityQueue.offer(element);
+ sideOutputLateElement(element);
}
- updatePriorityQueue(priorityQueue);
+ }
+ }
+
+ private void updateLastSeenWatermark(Watermark watermark) {
+ this.lastWatermark = watermark.getTimestamp();
+ }
+
+ /**
+ * Puts the provided late element in the dedicated side output,
+ * if the user has specified one.
+ *
+ * @param element The late element.
+ */
+ private void sideOutputLateElement(StreamRecord<IN> element) {
+ if (lateDataOutputTag != null) {
+ output.collect(lateDataOutputTag, element);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index a5eef45..c12680f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -33,7 +33,9 @@ import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.types.Either;
+import org.apache.flink.util.OutputTag;
import java.util.Map;
@@ -46,7 +48,7 @@ public class CEPOperatorUtils {
* @return Data stream containing fully matched event sequences stored in a {@link Map}. The
* events are indexed by their associated names of the pattern.
*/
- public static <K, T> DataStream<Map<String, T>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern) {
+ public static <K, T> SingleOutputStreamOperator<Map<String, T>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern, OutputTag<T> lateDataOutputTag) {
final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
// check whether we use processing time
@@ -55,7 +57,7 @@ public class CEPOperatorUtils {
// compile our pattern into a NFAFactory to instantiate NFAs later on
final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, false);
- final DataStream<Map<String, T>> patternStream;
+ final SingleOutputStreamOperator<Map<String, T>> patternStream;
if (inputStream instanceof KeyedStream) {
// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
@@ -73,6 +75,7 @@ public class CEPOperatorUtils {
keySelector,
keySerializer,
nfaFactory,
+ lateDataOutputTag,
true));
} else {
@@ -88,6 +91,7 @@ public class CEPOperatorUtils {
keySelector,
keySerializer,
nfaFactory,
+ lateDataOutputTag,
false
)).forceNonParallel();
}
@@ -104,7 +108,8 @@ public class CEPOperatorUtils {
* @return Data stream containing fully matched and partially matched event sequences wrapped in
* a {@link Either} instance.
*/
- public static <K, T> DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> createTimeoutPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern) {
+ public static <K, T> SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> createTimeoutPatternStream(
+ DataStream<T> inputStream, Pattern<T, ?> pattern, OutputTag<T> lateDataOutputTag) {
final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
@@ -114,7 +119,7 @@ public class CEPOperatorUtils {
// compile our pattern into a NFAFactory to instantiate NFAs later on
final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, true);
- final DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream;
+ final SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream;
final TypeInformation<Map<String, T>> rightTypeInfo = (TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class);
final TypeInformation<Tuple2<Map<String, T>, Long>> leftTypeInfo = new TupleTypeInfo<>(rightTypeInfo, BasicTypeInfo.LONG_TYPE_INFO);
@@ -130,12 +135,13 @@ public class CEPOperatorUtils {
patternStream = keyedStream.transform(
"TimeoutKeyedCEPPatternOperator",
eitherTypeInformation,
- new TimeoutKeyedCEPPatternOperator<T, K>(
+ new TimeoutKeyedCEPPatternOperator<>(
inputSerializer,
isProcessingTime,
keySelector,
keySerializer,
nfaFactory,
+ lateDataOutputTag,
true));
} else {
@@ -151,6 +157,7 @@ public class CEPOperatorUtils {
keySelector,
keySerializer,
nfaFactory,
+ lateDataOutputTag,
false
)).forceNonParallel();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 21cee23..532bba3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
import java.util.Collection;
import java.util.Iterator;
@@ -46,9 +47,10 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOpe
KeySelector<IN, KEY> keySelector,
TypeSerializer<KEY> keySerializer,
NFACompiler.NFAFactory<IN> nfaFactory,
+ OutputTag<IN> lateDataOutputTag,
boolean migratingFromOldKeyedOperator) {
- super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
+ super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index c6fba55..933bfd3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
+import org.apache.flink.util.OutputTag;
import java.util.Collection;
import java.util.Map;
@@ -46,9 +47,10 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat
KeySelector<IN, KEY> keySelector,
TypeSerializer<KEY> keySerializer,
NFACompiler.NFAFactory<IN> nfaFactory,
+ OutputTag<IN> lateDataOutputTag,
boolean migratingFromOldKeyedOperator) {
- super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
+ super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 42117ee..a5015df 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.types.Either;
+import org.apache.flink.util.OutputTag;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -47,6 +48,9 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
private String resultPath;
private String expected;
+ private String lateEventPath;
+ private String expectedLateEvents;
+
@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();
@@ -54,11 +58,15 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
public void before() throws Exception {
resultPath = tempFolder.newFile().toURI().toString();
expected = "";
+
+ lateEventPath = tempFolder.newFile().toURI().toString();
+ expectedLateEvents = "";
}
@After
public void after() throws Exception {
compareResultsByLinesInMemory(expected, resultPath);
+ compareResultsByLinesInMemory(expectedLateEvents, lateEventPath);
}
/**
@@ -572,4 +580,99 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
env.execute();
}
+
+ @Test
+ public void testLateEventSideOutput() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+
+ // (Event, timestamp)
+ DataStream<Event> input = env.fromElements(
+ Tuple2.of(new Event(1, "start", 1.0), 1L),
+ Tuple2.of(new Event(2, "middle", 2.0), 2L),
+ Tuple2.of(new Event(3, "end", 3.0), 15L),
+ Tuple2.of(new Event(4, "middle", 5.0), 7L),
+ Tuple2.of(new Event(6, "start", 1.0), 21L),
+ Tuple2.of(new Event(5, "middle", 5.0), 10L),
+ Tuple2.of(new Event(7, "middle", 2.0), 22L),
+ Tuple2.of(new Event(8, "end", 3.0), 23L)
+ ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
+
+ @Override
+ public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
+ return element.f1;
+ }
+
+ @Override
+ public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
+ return lastElement.f0.getName().equals("end") ? new Watermark(extractedTimestamp) : null;
+ }
+
+ }).map(new MapFunction<Tuple2<Event, Long>, Event>() {
+
+ @Override
+ public Event map(Tuple2<Event, Long> value) throws Exception {
+ return value.f0;
+ }
+ });
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("start");
+ }
+ }).followedBy("middle").where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("middle");
+ }
+ }).followedBy("end").where(new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("end");
+ }
+ });
+
+ final OutputTag<Event> lateOutputTag = new OutputTag<Event>("late-data"){};
+
+ PatternStream<Event> patternStream = CEP.pattern(input, pattern).withLateDataOutputTag(lateOutputTag);
+ DataStream<String> result = patternStream.select(
+ new PatternSelectFunction<Event, String>() {
+
+ @Override
+ public String select(Map<String, Event> pattern) {
+ StringBuilder builder = new StringBuilder();
+
+ builder.append(pattern.get("start").getId()).append(",")
+ .append(pattern.get("middle").getId()).append(",")
+ .append(pattern.get("end").getId());
+ return builder.toString();
+ }
+ }
+ );
+
+ DataStream<Event> lateEvents = patternStream.getSideOutput(lateOutputTag);
+
+ // we just care for the late events in this test.
+ lateEvents.map(
+ new MapFunction<Event, Integer>() {
+
+ @Override
+ public Integer map(Event value) throws Exception {
+ return value.getId();
+ }
+ }
+ ).writeAsText(lateEventPath, FileSystem.WriteMode.OVERWRITE);
+
+ // the expected sequence of late event ids
+ expectedLateEvents = "4\n5";
+
+ result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+ expected = "1,2,3\n1,2,8\n1,7,8\n6,7,8";
+ env.execute();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index b83eb3c..4e05fcf 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -103,6 +103,7 @@ public class CEPMigration11to13Test {
keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
+ null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
@@ -177,6 +178,7 @@ public class CEPMigration11to13Test {
keySelector,
ByteSerializer.INSTANCE,
new NFAFactory(),
+ null,
false),
keySelector,
BasicTypeInfo.BYTE_TYPE_INFO);
http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
index dbe4230..8249535 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
@@ -118,6 +118,7 @@ public class CEPMigration12to13Test {
keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
+ null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
@@ -233,6 +234,7 @@ public class CEPMigration12to13Test {
keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
+ null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
@@ -353,6 +355,7 @@ public class CEPMigration12to13Test {
keySelector,
IntSerializer.INSTANCE,
new SinglePatternNFAFactory(),
+ null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 726c8b8..d599ec9 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -225,6 +225,7 @@ public class CEPOperatorTest extends TestLogger {
keySelector,
IntSerializer.INSTANCE,
new NFAFactory(true),
+ null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
@@ -482,6 +483,7 @@ public class CEPOperatorTest extends TestLogger {
keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
+ null,
true);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 2c86648..a048183 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -346,6 +346,7 @@ public class CEPRescalingTest {
keySelector,
BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
new NFAFactory(),
+ null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO,