You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/05/17 12:38:49 UTC
[3/9] flink git commit: [FLINK-6255] [cep] Remove
PatternStream.getSideOutput().
[FLINK-6255] [cep] Remove PatternStream.getSideOutput().
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05ad87f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05ad87f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05ad87f4
Branch: refs/heads/master
Commit: 05ad87f4ce8c0aea6944feb14bf19795c1fc56c9
Parents: 02ea418
Author: kl0u <kk...@gmail.com>
Authored: Fri May 12 16:01:38 2017 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Wed May 17 14:37:32 2017 +0200
----------------------------------------------------------------------
docs/dev/libs/cep.md | 41 +-------
.../apache/flink/cep/scala/PatternStream.scala | 35 +------
.../org/apache/flink/cep/PatternStream.java | 55 +----------
.../AbstractKeyedCEPPatternOperator.java | 24 -----
.../flink/cep/operator/CEPOperatorUtils.java | 9 +-
.../cep/operator/KeyedCEPPatternOperator.java | 4 +-
.../TimeoutKeyedCEPPatternOperator.java | 4 +-
.../java/org/apache/flink/cep/CEPITCase.java | 98 +-------------------
.../cep/operator/CEPFrom12MigrationTest.java | 6 --
.../cep/operator/CEPMigration11to13Test.java | 2 -
.../flink/cep/operator/CEPOperatorTest.java | 2 -
.../flink/cep/operator/CEPRescalingTest.java | 1 -
12 files changed, 13 insertions(+), 268 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index b379615..58e1a0a 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -806,46 +806,7 @@ in event time.
To also guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes
*correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last
-seen watermark. Late elements are not further processed but they can be redirected to a [side output]
-({{ site.baseurl }}/dev/stream/side_output.html) dedicated to them.
-
-To access the stream of late elements, you first need to specify that you want to get the late data using
-`.sideOutputLateData(OutputTag)` on the `PatternStream` returned using the `CEP.pattern(...)` call. If you do not do
-so, the late elements will be silently dropped. Then, you can get the side-output stream using the
-`.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and providing as argument the output tag used in
-the `.sideOutputLateData(OutputTag)`:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
-
-PatternStream<T> patternStream = CEP.pattern(...)
- .sideOutputLateData(lateOutputTag);
-
-// main output with matches
-DataStream<O> result = patternStream.select(...)
-
-// side output containing the late events
-DataStream<T> lateStream = patternStream.getSideOutput(lateOutputTag);
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val lateOutputTag = OutputTag[T]("late-data")
-
-val patternStream: PatternStream[T] = CEP.pattern(...)
- .sideOutputLateData(lateOutputTag)
-
-// main output with matches
-val result = patternStream.select(...)
-
-// side output containing the late events
-val lateStream = patternStream.getSideOutput(lateOutputTag)
-{% endhighlight %}
-</div>
-</div>
+seen watermark. Late elements are not further processed.
## Examples
http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index d4bc28c..e71439c 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -24,12 +24,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
import org.apache.flink.cep.pattern.{Pattern => JPattern}
import org.apache.flink.streaming.api.scala.{asScalaStream, _}
-import org.apache.flink.util.{Collector, OutputTag}
+import org.apache.flink.util.Collector
import org.apache.flink.types.{Either => FEither}
import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
import java.lang.{Long => JLong}
-import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.cep.operator.CEPOperatorUtils
import org.apache.flink.cep.scala.pattern.Pattern
@@ -47,23 +46,8 @@ import scala.collection.mutable
*/
class PatternStream[T](jPatternStream: JPatternStream[T]) {
- private[flink] var lateDataOutputTag: OutputTag[T] = null
-
private[flink] def wrappedPatternStream = jPatternStream
-
- /**
- * Send late arriving data to the side output identified by the given {@link OutputTag}. The
- * CEP library assumes correctness of the watermark, so an element is considered late if its
- * timestamp is smaller than the last received watermark.
- */
- @PublicEvolving
- def sideOutputLateData(outputTag: OutputTag[T]): PatternStream[T] = {
- jPatternStream.sideOutputLateData(outputTag)
- lateDataOutputTag = outputTag
- this
- }
-
def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])
def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream())
@@ -110,8 +94,7 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
jPatternStream.getInputStream(),
- jPatternStream.getPattern(),
- lateDataOutputTag)
+ jPatternStream.getPattern())
val cleanedSelect = cleanClosure(patternSelectFunction)
val cleanedTimeout = cleanClosure(patternTimeoutFunction)
@@ -176,8 +159,7 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
: DataStream[Either[L, R]] = {
val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
jPatternStream.getInputStream(),
- jPatternStream.getPattern(),
- lateDataOutputTag
+ jPatternStream.getPattern()
)
val cleanedSelect = cleanClosure(patternFlatSelectFunction)
@@ -338,17 +320,6 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
flatSelect(patternFlatTimeoutFun, patternFlatSelectFun)
}
-
- /**
- * Gets the {@link DataStream} that contains the elements that are emitted from an operation
- * into the side output with the given {@link OutputTag}.
- *
- * @param tag The tag identifying a specific side output.
- */
- @PublicEvolving
- def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X] = {
- asScalaStream(jPatternStream.getSideOutput(tag))
- }
}
object PatternStream {
http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 04dff49..5544689 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -30,8 +30,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.types.Either;
import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-import org.apache.flink.util.Preconditions;
import java.util.List;
import java.util.Map;
@@ -54,19 +52,6 @@ public class PatternStream<T> {
private final Pattern<T, ?> pattern;
- /**
- * A reference to the created pattern stream used to get
- * the registered side outputs, e.g late elements side output.
- */
- private SingleOutputStreamOperator<?> patternStream;
-
- /**
- * {@link OutputTag} to use for late arriving events. Elements for which
- * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
- * be emitted to this.
- */
- private OutputTag<T> lateDataOutputTag;
-
PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
this.inputStream = inputStream;
this.pattern = pattern;
@@ -81,22 +66,6 @@ public class PatternStream<T> {
}
/**
- * Send late arriving data to the side output identified by the given {@link OutputTag}. The
- * CEP library assumes correctness of the watermark, so an element is considered late if its
- * timestamp is smaller than the last received watermark.
- */
- public PatternStream<T> sideOutputLateData(OutputTag<T> outputTag) {
- Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
- Preconditions.checkArgument(lateDataOutputTag == null,
- "The late side output tag has already been initialized to " + lateDataOutputTag + ".");
- Preconditions.checkArgument(patternStream == null,
- "The late side output tag has to be set before calling select() or flatSelect().");
-
- this.lateDataOutputTag = inputStream.getExecutionEnvironment().clean(outputTag);
- return this;
- }
-
- /**
* Applies a select function to the detected pattern sequence. For each pattern sequence the
* provided {@link PatternSelectFunction} is called. The pattern select function can produce
* exactly one resulting element.
@@ -137,8 +106,7 @@ public class PatternStream<T> {
*/
public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
- CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
- this.patternStream = patternStream;
+ CEPOperatorUtils.createPatternStream(inputStream, pattern);
return patternStream.map(
new PatternSelectMapper<>(
@@ -169,8 +137,7 @@ public class PatternStream<T> {
final PatternSelectFunction<T, R> patternSelectFunction) {
SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> patternStream =
- CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag);
- this.patternStream = patternStream;
+ CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternTimeoutFunction,
@@ -240,8 +207,7 @@ public class PatternStream<T> {
*/
public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
SingleOutputStreamOperator<Map<String, List<T>>> patternStream =
- CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
- this.patternStream = patternStream;
+ CEPOperatorUtils.createPatternStream(inputStream, pattern);
return patternStream.flatMap(
new PatternFlatSelectMapper<>(
@@ -273,8 +239,7 @@ public class PatternStream<T> {
final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> patternStream =
- CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag);
- this.patternStream = patternStream;
+ CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern);
TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
patternFlatTimeoutFunction,
@@ -305,18 +270,6 @@ public class PatternStream<T> {
}
/**
- * Gets the {@link DataStream} that contains the elements that are emitted from an operation
- * into the side output with the given {@link OutputTag}.
- *
- * @param sideOutputTag The tag identifying a specific side output.
- */
- public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
- Preconditions.checkNotNull(patternStream, "The operator has not been initialized. " +
- "To have the late element side output, you have to first define the main output using select() or flatSelect().");
- return patternStream.getSideOutput(sideOutputTag);
- }
-
- /**
* Wrapper for a {@link PatternSelectFunction}.
*
* @param <T> Type of the input elements
http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 3afe397..7068bc4 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -47,7 +47,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Migration;
-import org.apache.flink.util.OutputTag;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -99,13 +98,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
private transient InternalTimerService<VoidNamespace> timerService;
/**
- * {@link OutputTag} to use for late arriving events. Elements for which
- * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will
- * be emitted to this.
- */
- private final OutputTag<IN> lateDataOutputTag;
-
- /**
* The last seen watermark. This will be used to
* decide if an incoming element is late or not.
*/
@@ -123,7 +115,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
final KeySelector<IN, KEY> keySelector,
final TypeSerializer<KEY> keySerializer,
final NFACompiler.NFAFactory<IN> nfaFactory,
- final OutputTag<IN> lateDataOutputTag,
final boolean migratingFromOldKeyedOperator) {
this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
@@ -132,7 +123,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
this.keySerializer = Preconditions.checkNotNull(keySerializer);
this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
- this.lateDataOutputTag = lateDataOutputTag;
this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator;
}
@@ -203,8 +193,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
priorityQueue.offer(element);
}
updatePriorityQueue(priorityQueue);
- } else {
- sideOutputLateElement(element);
}
}
}
@@ -266,18 +254,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
this.lastWatermark = timestamp;
}
- /**
- * Puts the provided late element in the dedicated side output,
- * if the user has specified one.
- *
- * @param element The late element.
- */
- private void sideOutputLateElement(StreamRecord<IN> element) {
- if (lateDataOutputTag != null) {
- output.collect(lateDataOutputTag, element);
- }
- }
-
private NFA<IN> getNFA() throws IOException {
NFA<IN> nfa = nfaOperatorState.value();
return nfa != null ? nfa : nfaFactory.createNFA();
http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index 065c244..08424a4 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.types.Either;
-import org.apache.flink.util.OutputTag;
import java.util.List;
import java.util.Map;
@@ -49,7 +48,7 @@ public class CEPOperatorUtils {
* @return Data stream containing fully matched event sequences stored in a {@link Map}. The
* events are indexed by their associated names of the pattern.
*/
- public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern, OutputTag<T> lateDataOutputTag) {
+ public static <K, T> SingleOutputStreamOperator<Map<String, List<T>>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern) {
final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
// check whether we use processing time
@@ -76,7 +75,6 @@ public class CEPOperatorUtils {
keySelector,
keySerializer,
nfaFactory,
- lateDataOutputTag,
true));
} else {
@@ -92,7 +90,6 @@ public class CEPOperatorUtils {
keySelector,
keySerializer,
nfaFactory,
- lateDataOutputTag,
false
)).forceNonParallel();
}
@@ -110,7 +107,7 @@ public class CEPOperatorUtils {
* a {@link Either} instance.
*/
public static <K, T> SingleOutputStreamOperator<Either<Tuple2<Map<String, List<T>>, Long>, Map<String, List<T>>>> createTimeoutPatternStream(
- DataStream<T> inputStream, Pattern<T, ?> pattern, OutputTag<T> lateDataOutputTag) {
+ DataStream<T> inputStream, Pattern<T, ?> pattern) {
final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
@@ -142,7 +139,6 @@ public class CEPOperatorUtils {
keySelector,
keySerializer,
nfaFactory,
- lateDataOutputTag,
true));
} else {
@@ -158,7 +154,6 @@ public class CEPOperatorUtils {
keySelector,
keySerializer,
nfaFactory,
- lateDataOutputTag,
false
)).forceNonParallel();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index f48f5c3..4d68afb 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.OutputTag;
import java.util.Collection;
import java.util.Iterator;
@@ -48,10 +47,9 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOpe
KeySelector<IN, KEY> keySelector,
TypeSerializer<KEY> keySerializer,
NFACompiler.NFAFactory<IN> nfaFactory,
- OutputTag<IN> lateDataOutputTag,
boolean migratingFromOldKeyedOperator) {
- super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator);
+ super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index 618a94d..9061bcb 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -25,7 +25,6 @@ import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.types.Either;
-import org.apache.flink.util.OutputTag;
import java.util.Collection;
import java.util.List;
@@ -48,10 +47,9 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat
KeySelector<IN, KEY> keySelector,
TypeSerializer<KEY> keySerializer,
NFACompiler.NFAFactory<IN> nfaFactory,
- OutputTag<IN> lateDataOutputTag,
boolean migratingFromOldKeyedOperator) {
- super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator);
+ super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index a6e925d..9a08659 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -33,7 +33,6 @@ import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.types.Either;
-import org.apache.flink.util.OutputTag;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -581,99 +580,4 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
env.execute();
}
-
- @Test
- public void testLateEventSideOutput() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- env.setParallelism(1);
-
- // (Event, timestamp)
- DataStream<Event> input = env.fromElements(
- Tuple2.of(new Event(1, "start", 1.0), 1L),
- Tuple2.of(new Event(2, "middle", 2.0), 2L),
- Tuple2.of(new Event(3, "end", 3.0), 15L),
- Tuple2.of(new Event(4, "middle", 5.0), 7L),
- Tuple2.of(new Event(6, "start", 1.0), 21L),
- Tuple2.of(new Event(5, "middle", 5.0), 10L),
- Tuple2.of(new Event(7, "middle", 2.0), 22L),
- Tuple2.of(new Event(8, "end", 3.0), 23L)
- ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
-
- @Override
- public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
- return element.f1;
- }
-
- @Override
- public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
- return lastElement.f0.getName().equals("end") ? new Watermark(extractedTimestamp) : null;
- }
-
- }).map(new MapFunction<Tuple2<Event, Long>, Event>() {
-
- @Override
- public Event map(Tuple2<Event, Long> value) throws Exception {
- return value.f0;
- }
- });
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("start");
- }
- }).followedByAny("middle").where(new SimpleCondition<Event>() {
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("middle");
- }
- }).followedByAny("end").where(new SimpleCondition<Event>() {
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("end");
- }
- });
-
- final OutputTag<Event> lateOutputTag = new OutputTag<Event>("late-data"){};
-
- PatternStream<Event> patternStream = CEP.pattern(input, pattern).sideOutputLateData(lateOutputTag);
- DataStream<String> result = patternStream.select(
- new PatternSelectFunction<Event, String>() {
-
- @Override
- public String select(Map<String, List<Event>> pattern) {
- StringBuilder builder = new StringBuilder();
-
- builder.append(pattern.get("start").get(0).getId()).append(",")
- .append(pattern.get("middle").get(0).getId()).append(",")
- .append(pattern.get("end").get(0).getId());
- return builder.toString();
- }
- }
- );
-
- DataStream<Event> lateEvents = patternStream.getSideOutput(lateOutputTag);
-
- // we just care for the late events in this test.
- lateEvents.map(
- new MapFunction<Event, Integer>() {
-
- @Override
- public Integer map(Event value) throws Exception {
- return value.getId();
- }
- }
- ).writeAsText(lateEventPath, FileSystem.WriteMode.OVERWRITE);
-
- // the expected sequence of late event ids
- expectedLateEvents = "4\n5";
-
- result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
- expected = "1,2,3\n1,2,8\n1,7,8\n6,7,8";
- env.execute();
- }
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index afb3e7c..789d000 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -81,7 +81,6 @@ public class CEPFrom12MigrationTest {
keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
- null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
@@ -129,7 +128,6 @@ public class CEPFrom12MigrationTest {
keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
- null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
@@ -204,7 +202,6 @@ public class CEPFrom12MigrationTest {
keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
- null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
@@ -250,7 +247,6 @@ public class CEPFrom12MigrationTest {
keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
- null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
@@ -337,7 +333,6 @@ public class CEPFrom12MigrationTest {
keySelector,
IntSerializer.INSTANCE,
new SinglePatternNFAFactory(),
- null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
@@ -376,7 +371,6 @@ public class CEPFrom12MigrationTest {
keySelector,
IntSerializer.INSTANCE,
new SinglePatternNFAFactory(),
- null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index 404de54..e5719c5 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -104,7 +104,6 @@ public class CEPMigration11to13Test {
keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
- null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
@@ -179,7 +178,6 @@ public class CEPMigration11to13Test {
keySelector,
ByteSerializer.INSTANCE,
new NFAFactory(),
- null,
false),
keySelector,
BasicTypeInfo.BYTE_TYPE_INFO);
http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 5ed8b46..74bddbb 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -227,7 +227,6 @@ public class CEPOperatorTest extends TestLogger {
keySelector,
IntSerializer.INSTANCE,
new NFAFactory(true),
- null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO);
@@ -487,7 +486,6 @@ public class CEPOperatorTest extends TestLogger {
keySelector,
IntSerializer.INSTANCE,
new NFAFactory(),
- null,
true);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/05ad87f4/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 0210ef9..9eb8da2 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -347,7 +347,6 @@ public class CEPRescalingTest {
keySelector,
BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
new NFAFactory(),
- null,
true),
keySelector,
BasicTypeInfo.INT_TYPE_INFO,