You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/05/17 12:38:49 UTC

[3/9] flink git commit: [FLINK-6255] [cep] Remove PatternStream.getSideOutput().

[FLINK-6255] [cep] Remove PatternStream.getSideOutput().


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05ad87f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05ad87f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05ad87f4

Branch: refs/heads/master
Commit: 05ad87f4ce8c0aea6944feb14bf19795c1fc56c9
Parents: 02ea418
Author: kl0u <kk...@gmail.com>
Authored: Fri May 12 16:01:38 2017 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Wed May 17 14:37:32 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            | 41 +-------
 .../apache/flink/cep/scala/PatternStream.scala  | 35 +------
 .../org/apache/flink/cep/PatternStream.java     | 55 +----------
 .../AbstractKeyedCEPPatternOperator.java        | 24 -----
 .../flink/cep/operator/CEPOperatorUtils.java    |  9 +-
 .../cep/operator/KeyedCEPPatternOperator.java   |  4 +-
 .../TimeoutKeyedCEPPatternOperator.java         |  4 +-
 .../java/org/apache/flink/cep/CEPITCase.java    | 98 +-------------------
 .../cep/operator/CEPFrom12MigrationTest.java    |  6 --
 .../cep/operator/CEPMigration11to13Test.java    |  2 -
 .../flink/cep/operator/CEPOperatorTest.java     |  2 -
 .../flink/cep/operator/CEPRescalingTest.java    |  1 -
 12 files changed, 13 insertions(+), 268 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index b379615..58e1a0a 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -806,46 +806,7 @@ in event time.
 
 To also guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes 
 *correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last 
-seen watermark. Late elements are not further processed but they can be redirected to a [side output]
-({{ site.baseurl }}/dev/stream/side_output.html) dedicated to them.
-
-To access the stream of late elements, you first need to specify that you want to get the late data using 
-`.sideOutputLateData(OutputTag)` on the `PatternStream` returned using the `CEP.pattern(...)` call. If you do not do
-so, the late elements will be silently dropped. Then, you can get the side-output stream using the 
-`.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and providing as argument the output tag used in 
-the `.sideOutputLateData(OutputTag)`:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
-
-PatternStream<T> patternStream = CEP.pattern(...)
-    .sideOutputLateData(lateOutputTag);
-
-// main output with matches
-DataStream<O> result = patternStream.select(...)    
-
-// side output containing the late events
-DataStream<T> lateStream = patternStream.getSideOutput(lateOutputTag);
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val lateOutputTag = OutputTag[T]("late-data")
-
-val patternStream: PatternStream[T] = CEP.pattern(...)
-    .sideOutputLateData(lateOutputTag)
-
-// main output with matches
-val result = patternStream.select(...)
-
-// side output containing the late events
-val lateStream = patternStream.getSideOutput(lateOutputTag)
-{% endhighlight %}
-</div>
-</div>
+seen watermark. Late elements are not further processed.
 
 ## Examples
 

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index d4bc28c..e71439c 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -24,12 +24,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
 import org.apache.flink.cep.pattern.{Pattern => JPattern}
 import org.apache.flink.streaming.api.scala.{asScalaStream, _}
-import org.apache.flink.util.{Collector, OutputTag}
+import org.apache.flink.util.Collector
 import org.apache.flink.types.{Either => FEither}
 import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
 import java.lang.{Long => JLong}
 
-import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.cep.operator.CEPOperatorUtils
 import org.apache.flink.cep.scala.pattern.Pattern
 
@@ -47,23 +46,8 @@ import scala.collection.mutable
   */
 class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
-  private[flink] var lateDataOutputTag: OutputTag[T] = null
-
   private[flink] def wrappedPatternStream = jPatternStream
 
-
-  /**
-    * Send late arriving data to the side output identified by the given {@link OutputTag}. The
-    * CEP library assumes correctness of the watermark, so an element is considered late if its
-    * timestamp is smaller than the last received watermark.
-    */
-  @PublicEvolving
-  def sideOutputLateData(outputTag: OutputTag[T]): PatternStream[T] = {
-    jPatternStream.sideOutputLateData(outputTag)
-    lateDataOutputTag = outputTag
-    this
-  }
-
   def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])
 
   def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream())
@@ -110,8 +94,7 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
     val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
       jPatternStream.getInputStream(),
-      jPatternStream.getPattern(),
-      lateDataOutputTag)
+      jPatternStream.getPattern())
 
     val cleanedSelect = cleanClosure(patternSelectFunction)
     val cleanedTimeout = cleanClosure(patternTimeoutFunction)
@@ -176,8 +159,7 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
   : DataStream[Either[L, R]] = {
     val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
       jPatternStream.getInputStream(),
-      jPatternStream.getPattern(),
-      lateDataOutputTag
+      jPatternStream.getPattern()
     )
 
     val cleanedSelect = cleanClosure(patternFlatSelectFunction)
@@ -338,17 +320,6 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
     flatSelect(patternFlatTimeoutFun, patternFlatSelectFun)
   }
-
-  /**
-    * Gets the {@link DataStream} that contains the elements that are emitted from an operation
-    * into the side output with the given {@link OutputTag}.
-    *
-    * @param tag The tag identifying a specific side output.
-    */
-    @PublicEvolving
-    def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X] = {
-      asScalaStream(jPatternStream.getSideOutput(tag))
-    }
 }
 
 object PatternStream {

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 04dff49..5544689 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -30,8 +30,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 import java.util.Map;
@@ -54,19 +52,6 @@ public class PatternStream<T> {
 
 	private final Pattern<T, ?> pattern;
 
-	/**
-	 * A reference to the created pattern stream used to get
-	 * the registered side outputs, e.g late elements side output.
-	 */
-	private SingleOutputStreamOperator<?> patternStream;
-
-	/**
-	 * {@link OutputTag} to use for late arriving events. Elements for which
-	 * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
-	 * be emitted to this.
-	 */
-	private OutputTag<T> lateDataOutputTag;
-
 	PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
 		this.inputStream = inputStream;
 		this.pattern = pattern;
@@ -81,22 +66,6 @@ public class PatternStream<T> {
 	}
 
 	/**
-	 * Send late arriving data to the side output identified by the given {@link OutputTag}. The
-	 * CEP library assumes correctness of the watermark, so an element is considered late if its
-	 * timestamp is smaller than the last received watermark.
-	 */
-	public PatternStream<T> sideOutputLateData(OutputTag<T> outputTag) {
-		Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
-		Preconditions.checkArgument(lateDataOutputTag == null,
-				"The late side output tag has already been initialized to " + lateDataOutputTag + ".");
-		Preconditions.checkArgument(patternStream == null,
-				"The late side output tag has to be set before calling select() or flatSelect().");
-
-		this.lateDataOutputTag = inputStream.getExecutionEnvironment().clean(outputTag);
-		return this;
-	}
-
-	/**
 	 * Applies a select function to the detected pattern sequence. For each pattern sequence the
 	 * provided {@link PatternSelectFunction} is called. The pattern select function can produce
 	 * exactly one resulting element.
@@ -137,8 +106,7 @@ public class PatternStream<T> {
 	 */
 	public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
 		SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
-				CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
-		this.patternStream = patternStream;
+				CEPOperatorUtils.createPatternStream(inputStream, pattern);
 
 		return patternStream.map(
 			new PatternSelectMapper<>(
@@ -169,8 +137,7 @@ public class PatternStream<T> {
 		final PatternSelectFunction<T, R> patternSelectFunction) {
 
 		SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> patternStream =
-				CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag);
-		this.patternStream = patternStream;
+				CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
 
 		TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternTimeoutFunction,
@@ -240,8 +207,7 @@ public class PatternStream<T> {
 	 */
 	public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
 		SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
-				CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
-		this.patternStream = patternStream;
+				CEPOperatorUtils.createPatternStream(inputStream, pattern);
 
 		return patternStream.flatMap(
 			new PatternFlatSelectMapper<>(
@@ -273,8 +239,7 @@ public class PatternStream<T> {
 		final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
 
 		SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> patternStream =
-				CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag);
-		this.patternStream = patternStream;
+				CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
 
 		TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatTimeoutFunction,
@@ -305,18 +270,6 @@ public class PatternStream<T> {
 	}
 
 	/**
-	 * Gets the {@link DataStream} that contains the elements that are emitted from an operation
-	 * into the side output with the given {@link OutputTag}.
-	 *
-	 * @param sideOutputTag The tag identifying a specific side output.
-	 */
-	public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
-		Preconditions.checkNotNull(patternStream, "The operator has not been initialized. " +
-				"To have the late element side output, you have to first define the main output using select() or flatSelect().");
-		return patternStream.getSideOutput(sideOutputTag);
-	}
-
-	/**
 	 * Wrapper for a {@link PatternSelectFunction}.
 	 *
 	 * @param <T> Type of the input elements

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 3afe397..7068bc4 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -47,7 +47,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Migration;
-import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -99,13 +98,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 	private transient InternalTimerService<VoidNamespace> timerService;
 
 	/**
-	 * {@link OutputTag} to use for late arriving events. Elements for which
-	 * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
-	 * be emitted to this.
-	 */
-	private final OutputTag<IN> lateDataOutputTag;
-
-	/**
 	 * The last seen watermark. This will be used to
 	 * decide if an incoming element is late or not.
 	 */
@@ -123,7 +115,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 			final KeySelector<IN, KEY> keySelector,
 			final TypeSerializer<KEY> keySerializer,
 			final NFACompiler.NFAFactory<IN> nfaFactory,
-			final OutputTag<IN> lateDataOutputTag,
 			final boolean migratingFromOldKeyedOperator) {
 
 		this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
@@ -132,7 +123,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
 		this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
 
-		this.lateDataOutputTag = lateDataOutputTag;
 		this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator;
 	}
 
@@ -203,8 +193,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 					priorityQueue.offer(element);
 				}
 				updatePriorityQueue(priorityQueue);
-			} else {
-				sideOutputLateElement(element);
 			}
 		}
 	}
@@ -266,18 +254,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 		this.lastWatermark = timestamp;
 	}
 
-	/**
-	 * Puts the provided late element in the dedicated side output,
-	 * if the user has specified one.
-	 *
-	 * @param element The late element.
-	 */
-	private void sideOutputLateElement(StreamRecord<IN> element) {
-		if (lateDataOutputTag != null) {
-			output.collect(lateDataOutputTag, element);
-		}
-	}
-
 	private NFA<IN> getNFA() throws IOException {
 		NFA<IN> nfa = nfaOperatorState.value();
 		return nfa != null ? nfa : nfaFactory.createNFA();

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index 065c244..08424a4 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.types.Either;
-import org.apache.flink.util.OutputTag;
 
 import java.util.List;
 import java.util.Map;
@@ -49,7 +48,7 @@ public class CEPOperatorUtils {
 	 * @return Data stream containing fully matched event sequences stored in a {@link Map}. The
 	 * events are indexed by their associated names of the pattern.
 	 */
-	public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern, OutputTag<T> lateDataOutputTag) {
+	public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern) {
 		final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
 
 		// check whether we use processing time
@@ -76,7 +75,6 @@ public class CEPOperatorUtils {
 					keySelector,
 					keySerializer,
 					nfaFactory,
-					lateDataOutputTag,
 					true));
 		} else {
 
@@ -92,7 +90,6 @@ public class CEPOperatorUtils {
 					keySelector,
 					keySerializer,
 					nfaFactory,
-					lateDataOutputTag,
 					false
 				)).forceNonParallel();
 		}
@@ -110,7 +107,7 @@ public class CEPOperatorUtils {
 	 * a {@link Either} instance.
 	 */
 	public static <K, T> SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> createTimeoutPatternStream(
-			DataStream<T> inputStream, Pattern<T, ?> pattern, OutputTag<T> lateDataOutputTag) {
+			DataStream<T> inputStream, Pattern<T, ?> pattern) {
 
 		final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
 
@@ -142,7 +139,6 @@ public class CEPOperatorUtils {
 					keySelector,
 					keySerializer,
 					nfaFactory,
-					lateDataOutputTag,
 					true));
 		} else {
 
@@ -158,7 +154,6 @@ public class CEPOperatorUtils {
 					keySelector,
 					keySerializer,
 					nfaFactory,
-					lateDataOutputTag,
 					false
 				)).forceNonParallel();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index f48f5c3..4d68afb 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.OutputTag;
 
 import java.util.Collection;
 import java.util.Iterator;
@@ -48,10 +47,9 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOpe
 			KeySelector<IN, KEY> keySelector,
 			TypeSerializer<KEY> keySerializer,
 			NFACompiler.NFAFactory<IN> nfaFactory,
-			OutputTag<IN> lateDataOutputTag,
 			boolean migratingFromOldKeyedOperator) {
 
-		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator);
+		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 618a94d..9061bcb 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -25,7 +25,6 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Either;
-import org.apache.flink.util.OutputTag;
 
 import java.util.Collection;
 import java.util.List;
@@ -48,10 +47,9 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat
 			KeySelector<IN, KEY> keySelector,
 			TypeSerializer<KEY> keySerializer,
 			NFACompiler.NFAFactory<IN> nfaFactory,
-			OutputTag<IN> lateDataOutputTag,
 			boolean migratingFromOldKeyedOperator) {
 
-		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator);
+		super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index a6e925d..9a08659 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 
 import org.apache.flink.types.Either;
-import org.apache.flink.util.OutputTag;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -581,99 +580,4 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 
 		env.execute();
 	}
-
-	@Test
-	public void testLateEventSideOutput() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-		env.setParallelism(1);
-
-		// (Event, timestamp)
-		DataStream<Event> input = env.fromElements(
-				Tuple2.of(new Event(1, "start", 1.0), 1L),
-				Tuple2.of(new Event(2, "middle", 2.0), 2L),
-				Tuple2.of(new Event(3, "end", 3.0), 15L),
-				Tuple2.of(new Event(4, "middle", 5.0), 7L),
-				Tuple2.of(new Event(6, "start", 1.0), 21L),
-				Tuple2.of(new Event(5, "middle", 5.0), 10L),
-				Tuple2.of(new Event(7, "middle", 2.0), 22L),
-				Tuple2.of(new Event(8, "end", 3.0), 23L)
-		).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
-
-			@Override
-			public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
-				return element.f1;
-			}
-
-			@Override
-			public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
-				return lastElement.f0.getName().equals("end") ? new Watermark(extractedTimestamp) : null;
-			}
-
-		}).map(new MapFunction<Tuple2<Event, Long>, Event>() {
-
-			@Override
-			public Event map(Tuple2<Event, Long> value) throws Exception {
-				return value.f0;
-			}
-		});
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("start");
-			}
-		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("middle");
-			}
-		}).followedByAny("end").where(new SimpleCondition<Event>() {
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("end");
-			}
-		});
-
-		final OutputTag<Event> lateOutputTag = new OutputTag<Event>("late-data"){};
-
-		PatternStream<Event> patternStream = CEP.pattern(input, pattern).sideOutputLateData(lateOutputTag);
-		DataStream<String> result = patternStream.select(
-				new PatternSelectFunction<Event, String>() {
-
-					@Override
-					public String select(Map<String, List<Event>> pattern) {
-						StringBuilder builder = new StringBuilder();
-
-						builder.append(pattern.get("start").get(0).getId()).append(",")
-								.append(pattern.get("middle").get(0).getId()).append(",")
-								.append(pattern.get("end").get(0).getId());
-						return builder.toString();
-					}
-				}
-		);
-
-		DataStream<Event> lateEvents = patternStream.getSideOutput(lateOutputTag);
-
-		// we just care for the late events in this test.
-		lateEvents.map(
-				new MapFunction<Event, Integer>() {
-
-					@Override
-					public Integer map(Event value) throws Exception {
-						return value.getId();
-					}
-				}
-		).writeAsText(lateEventPath, FileSystem.WriteMode.OVERWRITE);
-
-		// the expected sequence of late event ids
-		expectedLateEvents = "4\n5";
-
-		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
-		expected = "1,2,3\n1,2,8\n1,7,8\n6,7,8";
-		env.execute();
-	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index afb3e7c..789d000 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -81,7 +81,6 @@ public class CEPFrom12MigrationTest {
 								keySelector,
 								IntSerializer.INSTANCE,
 								new NFAFactory(),
-								null,
 								true),
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);
@@ -129,7 +128,6 @@ public class CEPFrom12MigrationTest {
 								keySelector,
 								IntSerializer.INSTANCE,
 								new NFAFactory(),
-								null,
 								true),
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);
@@ -204,7 +202,6 @@ public class CEPFrom12MigrationTest {
 								keySelector,
 								IntSerializer.INSTANCE,
 								new NFAFactory(),
-								null,
 								true),
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);
@@ -250,7 +247,6 @@ public class CEPFrom12MigrationTest {
 								keySelector,
 								IntSerializer.INSTANCE,
 								new NFAFactory(),
-								null,
 								true),
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);
@@ -337,7 +333,6 @@ public class CEPFrom12MigrationTest {
 								keySelector,
 								IntSerializer.INSTANCE,
 								new SinglePatternNFAFactory(),
-								null,
 								true),
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);
@@ -376,7 +371,6 @@ public class CEPFrom12MigrationTest {
 								keySelector,
 								IntSerializer.INSTANCE,
 								new SinglePatternNFAFactory(),
-								null,
 								true),
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 404de54..e5719c5 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -104,7 +104,6 @@ public class CEPMigration11to13Test {
 								keySelector,
 								IntSerializer.INSTANCE,
 								new NFAFactory(),
-								null,
 								true),
 						keySelector,
 						BasicTypeInfo.INT_TYPE_INFO);
@@ -179,7 +178,6 @@ public class CEPMigration11to13Test {
 								keySelector,
 								ByteSerializer.INSTANCE,
 								new NFAFactory(),
-								null,
 								false),
 						keySelector,
 						BasicTypeInfo.BYTE_TYPE_INFO);

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 5ed8b46..74bddbb 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -227,7 +227,6 @@ public class CEPOperatorTest extends TestLogger {
 				keySelector,
 				IntSerializer.INSTANCE,
 				new NFAFactory(true),
-				null,
 				true),
 			keySelector,
 			BasicTypeInfo.INT_TYPE_INFO);
@@ -487,7 +486,6 @@ public class CEPOperatorTest extends TestLogger {
 			keySelector,
 			IntSerializer.INSTANCE,
 			new NFAFactory(),
-			null,
 			true);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 0210ef9..9eb8da2 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -347,7 +347,6 @@ public class CEPRescalingTest {
 				keySelector,
 				BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
 				new NFAFactory(),
-				null,
 				true),
 			keySelector,
 			BasicTypeInfo.INT_TYPE_INFO,