You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/03/31 10:30:54 UTC

flink git commit: [FLINK-6205] [FLINK-6069] [cep] Correct watermark/late events in side output.

Repository: flink
Updated Branches:
  refs/heads/master 193224017 -> 48890285d


[FLINK-6205] [FLINK-6069] [cep] Correct watermark/late events in side output.

With this, the CEP library assumes correctness of the watermark
and considers as late, events that arrive having a timestamp
smaller than that of the last seen watermark. Late events are not
silently dropped, but the user can specify to send them to a side
output.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48890285
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48890285
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48890285

Branch: refs/heads/master
Commit: 48890285d4b1c285bebb971ae0dbfc310c6fcc0e
Parents: 1932240
Author: kl0u <kk...@gmail.com>
Authored: Thu Mar 23 19:01:15 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Fri Mar 31 11:16:50 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |  53 ++++++++++
 .../apache/flink/cep/scala/PatternStream.scala  |  37 ++++++-
 .../org/apache/flink/cep/PatternStream.java     |  72 +++++++++++--
 .../AbstractKeyedCEPPatternOperator.java        |  69 ++++++++++---
 .../flink/cep/operator/CEPOperatorUtils.java    |  17 ++-
 .../cep/operator/KeyedCEPPatternOperator.java   |   4 +-
 .../TimeoutKeyedCEPPatternOperator.java         |   4 +-
 .../java/org/apache/flink/cep/CEPITCase.java    | 103 +++++++++++++++++++
 .../cep/operator/CEPMigration11to13Test.java    |   2 +
 .../cep/operator/CEPMigration12to13Test.java    |   3 +
 .../flink/cep/operator/CEPOperatorTest.java     |   2 +
 .../flink/cep/operator/CEPRescalingTest.java    |   1 +
 12 files changed, 333 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index bb704c7..643d6ee 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -777,6 +777,59 @@ DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect
 </div>
 </div>
 
+### Handling Lateness in Event Time
+
+In `CEP` the order in which elements are processed matters. To guarantee that elements are processed in the correct order
+when working in event time, an incoming element is initially put in a buffer where elements are *sorted in ascending 
+order based on their timestamp*, and when a watermark arrives, all the elements in this buffer with timestamps smaller 
+than that of the watermark are processed. This implies that elements between watermarks are processed in event-time order. 
+
+<span class="label label-danger">Attention</span> The library assumes correctness of the watermark when working 
+in event time.
+
+To also guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes 
+*correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last 
+seen watermark. Late elements are not further processed but they can be redirected to a [side output]
+({{ site.baseurl }}/dev/stream/side_output.html), dedicated to them.
+
+To access the stream of late elements, you first need to specify that you want to get the late data using 
+`.withLateDataOutputTag(OutputTag)` on the `PatternStream` returned using the `CEP.pattern(...)` call. If you do not do
+so, the late elements will be silently dropped. Then, you can get the side-output stream using the 
+`.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and providing as argument the output tag used in 
+the `.withLateDataOutputTag(OutputTag)`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
+
+PatternStream<T> patternStream = CEP.pattern(...)
+    .withLateDataOutputTag(lateOutputTag);
+
+// main output with matches
+DataStream<O> result = patternStream.select(...)    
+
+// side output containing the late events
+DataStream<T> lateStream = patternStream.getSideOutput(lateOutputTag);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val lateOutputTag = OutputTag[T]("late-data")
+
+val patternStream: PatternStream[T] = CEP.pattern(...)
+    .withLateDataOutputTag(lateOutputTag)
+
+// main output with matches
+val result = patternStream.select(...)
+
+// side output containing the late events
+val lateStream = patternStream.getSideOutput(lateOutputTag)
+{% endhighlight %}
+</div>
+</div>
+
 ## Examples
 
 The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data stream of `Events`.

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index 6207049..fb09c15 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -22,12 +22,13 @@ import java.util.{Map => JMap}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
 import org.apache.flink.cep.pattern.{Pattern => JPattern}
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.scala.{asScalaStream, _}
+import org.apache.flink.util.{Collector, OutputTag}
 import org.apache.flink.types.{Either => FEither}
 import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
 import java.lang.{Long => JLong}
 
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.cep.operator.CEPOperatorUtils
 import org.apache.flink.cep.scala.pattern.Pattern
 
@@ -45,8 +46,23 @@ import scala.collection.mutable
   */
 class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
+  private[flink] var lateDataOutputTag: OutputTag[T] = null
+
   private[flink] def wrappedPatternStream = jPatternStream
 
+
+  /**
+    * Send late arriving data to the side output identified by the given {@link OutputTag}. The
+    * CEP library assumes correctness of the watermark, so an element is considered late if its
+    * timestamp is smaller than the last received watermark.
+    */
+  @PublicEvolving
+  def withLateDataOutputTag(outputTag: OutputTag[T]): PatternStream[T] = {
+    jPatternStream.withLateDataOutputTag(outputTag)
+    lateDataOutputTag = outputTag
+    this
+  }
+
   def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])
 
   def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream())
@@ -93,7 +109,8 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
     val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
       jPatternStream.getInputStream(),
-      jPatternStream.getPattern())
+      jPatternStream.getPattern(),
+      lateDataOutputTag)
 
     val cleanedSelect = cleanClosure(patternSelectFunction)
     val cleanedTimeout = cleanClosure(patternTimeoutFunction)
@@ -158,7 +175,8 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
   : DataStream[Either[L, R]] = {
     val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
       jPatternStream.getInputStream(),
-      jPatternStream.getPattern()
+      jPatternStream.getPattern(),
+      lateDataOutputTag
     )
 
     val cleanedSelect = cleanClosure(patternFlatSelectFunction)
@@ -317,6 +335,17 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
     flatSelect(patternFlatTimeoutFun, patternFlatSelectFun)
   }
+
+  /**
+    * Gets the {@link DataStream} that contains the elements that are emitted from an operation
+    * into the side output with the given {@link OutputTag}.
+    *
+    * @param tag The tag identifying a specific side output.
+    */
+    @PublicEvolving
+    def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X] = {
+      asScalaStream(jPatternStream.getSideOutput(tag))
+    }
 }
 
 object PatternStream {

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index efcd16c..87666a5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -27,8 +27,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.cep.operator.CEPOperatorUtils;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Map;
 
@@ -50,6 +53,19 @@ public class PatternStream<T> {
 
 	private final Pattern<T, ?> pattern;
 
+	/**
+	 * A reference to the created pattern stream used to get
+	 * the registered side outputs, e.g late elements side output.
+	 */
+	private SingleOutputStreamOperator<?> patternStream;
+
+	/**
+	 * {@link OutputTag} to use for late arriving events. Elements for which
+	 * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
+	 * be emitted to this.
+	 */
+	private OutputTag<T> lateDataOutputTag;
+
 	PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
 		this.inputStream = inputStream;
 		this.pattern = pattern;
@@ -64,6 +80,22 @@ public class PatternStream<T> {
 	}
 
 	/**
+	 * Send late arriving data to the side output identified by the given {@link OutputTag}. The
+	 * CEP library assumes correctness of the watermark, so an element is considered late if its
+	 * timestamp is smaller than the last received watermark.
+	 */
+	public PatternStream<T> withLateDataOutputTag(OutputTag<T> outputTag) {
+		Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
+		Preconditions.checkArgument(lateDataOutputTag == null,
+				"The late side output tag has already been initialized to " + lateDataOutputTag + ".");
+		Preconditions.checkArgument(patternStream == null,
+				"The late side output tag has to be set before calling select() or flatSelect().");
+
+		this.lateDataOutputTag = inputStream.getExecutionEnvironment().clean(outputTag);
+		return this;
+	}
+
+	/**
 	 * Applies a select function to the detected pattern sequence. For each pattern sequence the
 	 * provided {@link PatternSelectFunction} is called. The pattern select function can produce
 	 * exactly one resulting element.
@@ -74,7 +106,7 @@ public class PatternStream<T> {
 	 * @return {@link DataStream} which contains the resulting elements from the pattern select
 	 *         function.
 	 */
-	public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction) {
+	public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction) {
 		// we have to extract the output type from the provided pattern selection function manually
 		// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
 
@@ -102,8 +134,10 @@ public class PatternStream<T> {
 	 * @return {@link DataStream} which contains the resulting elements from the pattern select
 	 *         function.
 	 */
-	public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
-		DataStream<Map<String, T>> patternStream = CEPOperatorUtils.createPatternStream(inputStream, pattern);
+	public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
+		SingleOutputStreamOperator<Map<String, T>> patternStream =
+				CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
+		this.patternStream = patternStream;
 
 		return patternStream.map(
 			new PatternSelectMapper<>(
@@ -129,11 +163,13 @@ public class PatternStream<T> {
 	 * @return {@link DataStream} which contains the resulting elements or the resulting timeout
 	 * elements wrapped in an {@link Either} type.
 	 */
-	public <L, R> DataStream<Either<L, R>> select(
+	public <L, R> SingleOutputStreamOperator<Either<L, R>> select(
 		final PatternTimeoutFunction<T, L> patternTimeoutFunction,
 		final PatternSelectFunction<T, R> patternSelectFunction) {
 
-		DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream = CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
+		SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream =
+				CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag);
+		this.patternStream = patternStream;
 
 		TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternTimeoutFunction,
@@ -174,7 +210,7 @@ public class PatternStream<T> {
 	 * @return {@link DataStream} which contains the resulting elements from the pattern flat select
 	 *         function.
 	 */
-	public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
+	public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
 		// we have to extract the output type from the provided pattern selection function manually
 		// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
 		TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
@@ -201,8 +237,10 @@ public class PatternStream<T> {
 	 * @return {@link DataStream} which contains the resulting elements from the pattern flat select
 	 *         function.
 	 */
-	public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
-		DataStream<Map<String, T>> patternStream = CEPOperatorUtils.createPatternStream(inputStream, pattern);
+	public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
+		SingleOutputStreamOperator<Map<String, T>> patternStream =
+				CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
+		this.patternStream = patternStream;
 
 		return patternStream.flatMap(
 			new PatternFlatSelectMapper<>(
@@ -229,11 +267,13 @@ public class PatternStream<T> {
 	 * function or the resulting timeout events from the pattern flat timeout function wrapped in an
 	 * {@link Either} type.
 	 */
-	public <L, R> DataStream<Either<L, R>> flatSelect(
+	public <L, R> SingleOutputStreamOperator<Either<L, R>> flatSelect(
 		final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
 		final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
 
-		DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream = CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
+		SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream =
+				CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag);
+		this.patternStream = patternStream;
 
 		TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatTimeoutFunction,
@@ -264,6 +304,18 @@ public class PatternStream<T> {
 	}
 
 	/**
+	 * Gets the {@link DataStream} that contains the elements that are emitted from an operation
+	 * into the side output with the given {@link OutputTag}.
+	 *
+	 * @param sideOutputTag The tag identifying a specific side output.
+	 */
+	public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
+		Preconditions.checkNotNull(patternStream, "The operator has not been initialized. " +
+				"To have the late element side output, you have to first define the main output using select() or flatSelect().");
+		return patternStream.getSideOutput(sideOutputTag);
+	}
+
+	/**
 	 * Wrapper for a {@link PatternSelectFunction}.
 	 *
 	 * @param <T> Type of the input elements

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 3e18660..b232dbb 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -40,6 +40,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -89,6 +90,19 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 	private final NFACompiler.NFAFactory<IN> nfaFactory;
 
 	/**
+	 * {@link OutputTag} to use for late arriving events. Elements for which
+	 * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
+	 * be emitted to this.
+	 */
+	private final OutputTag<IN> lateDataOutputTag;
+
+	/**
+	 * The last seen watermark. This will be used to
+	 * decide if an incoming element is late or not.
+	 */
+	private long lastWatermark;
+
+	/**
 	 * A flag used in the case of migration that indicates if
 	 * we are restoring from an old keyed or non-keyed operator.
 	 */
@@ -100,6 +114,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 			final KeySelector<IN, KEY> keySelector,
 			final TypeSerializer<KEY> keySerializer,
 			final NFACompiler.NFAFactory<IN> nfaFactory,
+			final OutputTag<IN> lateDataOutputTag,
 			final boolean migratingFromOldKeyedOperator) {
 
 		this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
@@ -107,11 +122,9 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 		this.keySelector = Preconditions.checkNotNull(keySelector);
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
 		this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
-		this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator;
-	}
 
-	public TypeSerializer<IN> getInputSerializer() {
-		return inputSerializer;
+		this.lateDataOutputTag = lateDataOutputTag;
+		this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator;
 	}
 
 	@Override
@@ -159,6 +172,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 					// 3) advance the time to the current watermark, so that expired patterns are discarded.
 					// 4) update the stored state for the key, by only storing the new NFA and priority queue iff they
 					//		have state to be used later.
+					// 5) update the last seen watermark.
 
 					// STEP 1
 					PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
@@ -180,6 +194,9 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 					if (priorityQueue.isEmpty() && nfa.isEmpty()) {
 						watermarkCallbackService.unregisterKeyFromWatermarkCallback(key);
 					}
+
+					// STEP 5
+					updateLastSeenWatermark(watermark);
 				}
 			},
 			keySerializer
@@ -196,19 +213,45 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 			updateNFA(nfa);
 
 		} else {
-			getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
 
-			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+			// In event-time processing we assume correctness of the watermark.
+			// Events with timestamp smaller than the last seen watermark are considered late.
+			// Late events are put in a dedicated side output, if the user has specified one.
+
+			if (element.getTimestamp() >= lastWatermark) {
 
-			// event time processing
-			// we have to buffer the elements until we receive the proper watermark
-			if (getExecutionConfig().isObjectReuseEnabled()) {
-				// copy the StreamRecord so that it cannot be changed
-				priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
+				// we have an event with a valid timestamp, so
+				// we buffer it until we receive the proper watermark.
+
+				getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
+
+				PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
+				if (getExecutionConfig().isObjectReuseEnabled()) {
+					// copy the StreamRecord so that it cannot be changed
+					priorityQueue.offer(new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp()));
+				} else {
+					priorityQueue.offer(element);
+				}
+				updatePriorityQueue(priorityQueue);
 			} else {
-				priorityQueue.offer(element);
+				sideOutputLateElement(element);
 			}
-			updatePriorityQueue(priorityQueue);
+		}
+	}
+
+	private void updateLastSeenWatermark(Watermark watermark) {
+		this.lastWatermark = watermark.getTimestamp();
+	}
+
+	/**
+	 * Puts the provided late element in the dedicated side output,
+	 * if the user has specified one.
+	 *
+	 * @param element The late element.
+	 */
+	private void sideOutputLateElement(StreamRecord<IN> element) {
+		if (lateDataOutputTag != null) {
+			output.collect(lateDataOutputTag, element);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index a5eef45..c12680f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -33,7 +33,9 @@ import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.types.Either;
+import org.apache.flink.util.OutputTag;
 
 import java.util.Map;
 
@@ -46,7 +48,7 @@ public class CEPOperatorUtils {
 	 * @return Data stream containing fully matched event sequences stored in a {@link Map}. The
 	 * events are indexed by their associated names of the pattern.
 	 */
-	public static <K, T> DataStream<Map<String, T>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern) {
+	public static <K, T> SingleOutputStreamOperator<Map<String, T>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern, OutputTag<T> lateDataOutputTag) {
 		final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
 
 		// check whether we use processing time
@@ -55,7 +57,7 @@ public class CEPOperatorUtils {
 		// compile our pattern into a NFAFactory to instantiate NFAs later on
 		final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, false);
 
-		final DataStream<Map<String, T>> patternStream;
+		final SingleOutputStreamOperator<Map<String, T>> patternStream;
 
 		if (inputStream instanceof KeyedStream) {
 			// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
@@ -73,6 +75,7 @@ public class CEPOperatorUtils {
 					keySelector,
 					keySerializer,
 					nfaFactory,
+					lateDataOutputTag,
 					true));
 		} else {
 
@@ -88,6 +91,7 @@ public class CEPOperatorUtils {
 					keySelector,
 					keySerializer,
 					nfaFactory,
+					lateDataOutputTag,
 					false
 				)).forceNonParallel();
 		}
@@ -104,7 +108,8 @@ public class CEPOperatorUtils {
 	 * @return Data stream containing fully matched and partially matched event sequences wrapped in
 	 * a {@link Either} instance.
 	 */
-	public static <K, T> DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> createTimeoutPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern) {
+	public static <K, T> SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> createTimeoutPatternStream(
+			DataStream<T> inputStream, Pattern<T, ?> pattern, OutputTag<T> lateDataOutputTag) {
 
 		final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
 
@@ -114,7 +119,7 @@ public class CEPOperatorUtils {
 		// compile our pattern into a NFAFactory to instantiate NFAs later on
 		final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, true);
 
-		final DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream;
+		final SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream;
 
 		final TypeInformation<Map<String, T>> rightTypeInfo = (TypeInformation<Map<String, T>>) (TypeInformation<?>)  TypeExtractor.getForClass(Map.class);
 		final TypeInformation<Tuple2<Map<String, T>, Long>> leftTypeInfo = new TupleTypeInfo<>(rightTypeInfo, BasicTypeInfo.LONG_TYPE_INFO);
@@ -130,12 +135,13 @@ public class CEPOperatorUtils {
 			patternStream = keyedStream.transform(
 				"TimeoutKeyedCEPPatternOperator",
 				eitherTypeInformation,
-				new TimeoutKeyedCEPPatternOperator<T, K>(
+				new TimeoutKeyedCEPPatternOperator<>(
 					inputSerializer,
 					isProcessingTime,
 					keySelector,
 					keySerializer,
 					nfaFactory,
+					lateDataOutputTag,
 					true));
 		} else {
 
@@ -151,6 +157,7 @@ public class CEPOperatorUtils {
 					keySelector,
 					keySerializer,
 					nfaFactory,
+					lateDataOutputTag,
 					false
 				)).forceNonParallel();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 21cee23..532bba3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
 
 import java.util.Collection;
 import java.util.Iterator;
@@ -46,9 +47,10 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOpe
 			KeySelector<IN, KEY> keySelector,
 			TypeSerializer<KEY> keySerializer,
 			NFACompiler.NFAFactory<IN> nfaFactory,
+			OutputTag<IN> lateDataOutputTag,
 			boolean migratingFromOldKeyedOperator) {
 
-		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
+		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index c6fba55..933bfd3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Either;
+import org.apache.flink.util.OutputTag;
 
 import java.util.Collection;
 import java.util.Map;
@@ -46,9 +47,10 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat
 			KeySelector<IN, KEY> keySelector,
 			TypeSerializer<KEY> keySerializer,
 			NFACompiler.NFAFactory<IN> nfaFactory,
+			OutputTag<IN> lateDataOutputTag,
 			boolean migratingFromOldKeyedOperator) {
 
-		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
+		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 42117ee..a5015df 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 
 import org.apache.flink.types.Either;
+import org.apache.flink.util.OutputTag;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -47,6 +48,9 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 	private String resultPath;
 	private String expected;
 
+	private String lateEventPath;
+	private String expectedLateEvents;
+
 	@Rule
 	public TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -54,11 +58,15 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 	public void before() throws Exception {
 		resultPath = tempFolder.newFile().toURI().toString();
 		expected = "";
+
+		lateEventPath = tempFolder.newFile().toURI().toString();
+		expectedLateEvents = "";
 	}
 
 	@After
 	public void after() throws Exception {
 		compareResultsByLinesInMemory(expected, resultPath);
+		compareResultsByLinesInMemory(expectedLateEvents, lateEventPath);
 	}
 
 	/**
@@ -572,4 +580,99 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 
 		env.execute();
 	}
+
+	@Test
+	public void testLateEventSideOutput() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		env.setParallelism(1);
+
+		// (Event, timestamp)
+		DataStream<Event> input = env.fromElements(
+				Tuple2.of(new Event(1, "start", 1.0), 1L),
+				Tuple2.of(new Event(2, "middle", 2.0), 2L),
+				Tuple2.of(new Event(3, "end", 3.0), 15L),
+				Tuple2.of(new Event(4, "middle", 5.0), 7L),
+				Tuple2.of(new Event(6, "start", 1.0), 21L),
+				Tuple2.of(new Event(5, "middle", 5.0), 10L),
+				Tuple2.of(new Event(7, "middle", 2.0), 22L),
+				Tuple2.of(new Event(8, "end", 3.0), 23L)
+		).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
+
+			@Override
+			public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
+				return element.f1;
+			}
+
+			@Override
+			public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
+				return lastElement.f0.getName().equals("end") ? new Watermark(extractedTimestamp) : null;
+			}
+
+		}).map(new MapFunction<Tuple2<Event, Long>, Event>() {
+
+			@Override
+			public Event map(Tuple2<Event, Long> value) throws Exception {
+				return value.f0;
+			}
+		});
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("middle");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		final OutputTag<Event> lateOutputTag = new OutputTag<Event>("late-data"){};
+
+		PatternStream<Event> patternStream = CEP.pattern(input, pattern).withLateDataOutputTag(lateOutputTag);
+		DataStream<String> result = patternStream.select(
+				new PatternSelectFunction<Event, String>() {
+
+					@Override
+					public String select(Map<String, Event> pattern) {
+						StringBuilder builder = new StringBuilder();
+
+						builder.append(pattern.get("start").getId()).append(",")
+								.append(pattern.get("middle").getId()).append(",")
+								.append(pattern.get("end").getId());
+						return builder.toString();
+					}
+				}
+		);
+
+		DataStream<Event> lateEvents = patternStream.getSideOutput(lateOutputTag);
+
+		// we just care for the late events in this test.
+		lateEvents.map(
+				new MapFunction<Event, Integer>() {
+
+					@Override
+					public Integer map(Event value) throws Exception {
+						return value.getId();
+					}
+				}
+		).writeAsText(lateEventPath, FileSystem.WriteMode.OVERWRITE);
+
+		// the expected sequence of late event ids
+		expectedLateEvents = "4\n5";
+
+		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+		expected = "1,2,3\n1,2,8\n1,7,8\n6,7,8";
+		env.execute();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index b83eb3c..4e05fcf 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -103,6 +103,7 @@ public class CEPMigration11to13Test {
 								keySelector,
 								IntSerializer.INSTANCE,
 								new NFAFactory(),
+								null,
 								true),
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);
@@ -177,6 +178,7 @@ public class CEPMigration11to13Test {
 								keySelector,
 								ByteSerializer.INSTANCE,
 								new NFAFactory(),
+								null,
 								false),
 						keySelector,
 						BasicTypeInfo.BYTE_TYPE_INFO);

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
index dbe4230..8249535 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
@@ -118,6 +118,7 @@ public class CEPMigration12to13Test {
 					keySelector,
 					IntSerializer.INSTANCE,
 					new NFAFactory(),
+					null,
 					true),
 				keySelector,
 				BasicTypeInfo.INT_TYPE_INFO);
@@ -233,6 +234,7 @@ public class CEPMigration12to13Test {
 					keySelector,
 					IntSerializer.INSTANCE,
 					new NFAFactory(),
+					null,
 					true),
 				keySelector,
 				BasicTypeInfo.INT_TYPE_INFO);
@@ -353,6 +355,7 @@ public class CEPMigration12to13Test {
 					keySelector,
 					IntSerializer.INSTANCE,
 					new SinglePatternNFAFactory(),
+					null,
 					true),
 				keySelector,
 				BasicTypeInfo.INT_TYPE_INFO);

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 726c8b8..d599ec9 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -225,6 +225,7 @@ public class CEPOperatorTest extends TestLogger {
 				keySelector,
 				IntSerializer.INSTANCE,
 				new NFAFactory(true),
+				null,
 				true),
 			keySelector,
 			BasicTypeInfo.INT_TYPE_INFO);
@@ -482,6 +483,7 @@ public class CEPOperatorTest extends TestLogger {
 			keySelector,
 			IntSerializer.INSTANCE,
 			new NFAFactory(),
+			null,
 			true);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 2c86648..a048183 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -346,6 +346,7 @@ public class CEPRescalingTest {
 				keySelector,
 				BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				new NFAFactory(),
+				null,
 				true),
 			keySelector,
 			BasicTypeInfo.INT_TYPE_INFO,