You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:09 UTC
[02/50] [abbrv] flink git commit: [FLINK-6208] [cep] Implement skip
till next match strategy
[FLINK-6208] [cep] Implement skip till next match strategy
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c35dc0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c35dc0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c35dc0e
Branch: refs/heads/table-retraction
Commit: 7c35dc0e40a96f6b8d367b9a80a19e9a22525043
Parents: b527582
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Wed Apr 26 11:57:53 2017 +0200
Committer: kl0u <kk...@gmail.com>
Committed: Fri Apr 28 14:28:37 2017 +0200
----------------------------------------------------------------------
docs/dev/libs/cep.md | 19 +-
.../cep/scala/pattern/FollowedByPattern.scala | 44 --
.../flink/cep/scala/pattern/Pattern.scala | 31 +-
.../flink/cep/scala/pattern/package.scala | 3 +-
.../flink/cep/scala/pattern/PatternTest.scala | 52 +-
.../flink/cep/nfa/compiler/NFACompiler.java | 192 ++---
.../flink/cep/pattern/FollowedByPattern.java | 33 -
.../org/apache/flink/cep/pattern/Pattern.java | 42 +-
.../apache/flink/cep/pattern/Quantifier.java | 82 +-
.../java/org/apache/flink/cep/CEPITCase.java | 30 +-
.../org/apache/flink/cep/nfa/NFAITCase.java | 760 +++++++++----------
.../flink/cep/nfa/compiler/NFACompilerTest.java | 39 -
.../flink/cep/operator/CEPOperatorTest.java | 5 +-
.../apache/flink/cep/pattern/PatternTest.java | 9 +-
14 files changed, 656 insertions(+), 685 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 15afdf5..b379615 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -268,7 +268,7 @@ val strictNext: Pattern[Event, _] = start.next("middle")
</div>
Non-strict contiguity means that other events are allowed to occur in-between two matching events.
-A non-strict contiguity pattern state can be created via the `followedBy` method.
+A non-strict contiguity pattern state can be created via the `followedBy` or `followedByAny` method.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
@@ -282,6 +282,23 @@ Pattern<Event, ?> nonStrictNext = start.followedBy("middle");
val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")
{% endhighlight %}
</div>
+
+For non-strict contiguity one can specify if only the first succeeding matching event will be matched, or
+all. In the latter case multiple matches will be emitted for the same beginning.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Pattern<Event, ?> nonStrictNext = start.followedByAny("middle");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val nonStrictNext : Pattern[Event, _] = start.followedByAny("middle")
+{% endhighlight %}
+</div>
+
</div>
It is also possible to define a temporal constraint for the pattern to be valid.
For example, one can define that a pattern should occur within 10 seconds via the `within` method.
http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
deleted file mode 100644
index 4bda08f..0000000
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.cep.scala.pattern
-
-import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern}
-
-object FollowedByPattern {
- /**
- * Constructs a new Pattern by wrapping a given Java API Pattern
- *
- * @param jfbPattern Underlying Java API Pattern.
- * @tparam T Base type of the elements appearing in the pattern
- * @tparam F Subtype of T to which the current pattern operator is constrained
- * @return New wrapping FollowedByPattern object
- */
- def apply[T, F <: T](jfbPattern: JFollowedByPattern[T, F]) =
- new FollowedByPattern[T, F](jfbPattern)
-}
-
-/**
- * Pattern operator which signifies that the there is a non-strict temporal contiguity between
- * itself and its preceding pattern operator. This means that there might be events in between
- * two matching events. These events are then simply ignored.
- *
- * @tparam T Base type of the events
- * @tparam F Subtype of T to which the operator is currently constrained
- */
-class FollowedByPattern[T, F <: T](jfbPattern: JFollowedByPattern[T, F])
- extends Pattern[T, F](jfbPattern)
http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 3bdbfcf..65b7ab0 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -46,22 +46,22 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
/**
* @return The previous pattern
*/
- def getPrevious(): Option[Pattern[T, _ <: T]] = {
- wrapPattern(jPattern.getPrevious())
+ def getPrevious: Option[Pattern[T, _ <: T]] = {
+ wrapPattern(jPattern.getPrevious)
}
/**
*
* @return Name of the pattern operator
*/
- def getName(): String = jPattern.getName()
+ def getName: String = jPattern.getName
/**
*
* @return Window length in which the pattern match has to occur
*/
- def getWindowTime(): Option[Time] = {
- Option(jPattern.getWindowTime())
+ def getWindowTime: Option[Time] = {
+ Option(jPattern.getWindowTime)
}
/**
@@ -70,8 +70,8 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
*/
def getQuantifier: Quantifier = jPattern.getQuantifier
- def getCondition(): Option[IterativeCondition[F]] = {
- Option(jPattern.getCondition())
+ def getCondition: Option[IterativeCondition[F]] = {
+ Option(jPattern.getCondition)
}
/**
@@ -208,7 +208,22 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
* @param name Name of the new pattern
* @return A new pattern which is appended to this one
*/
- def followedBy(name: String) = FollowedByPattern(jPattern.followedBy(name))
+ def followedBy(name: String): Pattern[T, T] = {
+ Pattern[T, T](jPattern.followedBy(name))
+ }
+
+ /**
+ * Appends a new pattern to the existing one. The new pattern enforces non-strict
+ * temporal contiguity. This means that a matching event of this pattern and the
+ * preceding matching event might be interleaved with other events which are ignored.
+ *
+ * @param name Name of the new pattern
+ * @return A new pattern which is appended to this one
+ */
+ def followedByAny(name: String): Pattern[T, T] = {
+ Pattern[T, T](jPattern.followedByAny(name))
+ }
+
/**
* Specifies that this pattern is optional for a final match of the pattern
http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
index 382c160..26355a5 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
@@ -17,7 +17,7 @@
*/
package org.apache.flink.cep.scala
-import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern}
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
package object pattern {
/**
@@ -31,7 +31,6 @@ package object pattern {
*/
private[flink] def wrapPattern[T, F <: T](javaPattern: JPattern[T, F])
: Option[Pattern[T, F]] = javaPattern match {
- case f: JFollowedByPattern[T, F] => Some(FollowedByPattern[T, F](f))
case p: JPattern[T, F] => Some(Pattern[T, F](p))
case _ => None
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
index a95dddd..d574513 100644
--- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
@@ -22,7 +22,7 @@ import org.junit.Assert._
import org.junit.Test
import org.apache.flink.cep.Event
import org.apache.flink.cep.SubEvent
-import org.apache.flink.cep.pattern.conditions.IterativeCondition.Context
+import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy
import org.apache.flink.cep.pattern.conditions._
class PatternTest {
@@ -33,7 +33,7 @@ class PatternTest {
*/
@Test
- def testStrictContiguity: Unit = {
+ def testStrictContiguity(): Unit = {
val pattern = Pattern.begin[Event]("start").next("next").next("end")
val jPattern = JPattern.begin[Event]("start").next("next").next("end")
@@ -56,7 +56,7 @@ class PatternTest {
@Test
- def testNonStrictContiguity: Unit = {
+ def testNonStrictContiguity(): Unit = {
val pattern = Pattern.begin[Event]("start").followedBy("next").followedBy("end")
val jPattern = JPattern.begin[Event]("start").followedBy("next").followedBy("end")
@@ -69,8 +69,8 @@ class PatternTest {
assertTrue(previous.getPrevious.isDefined)
assertFalse(preprevious.getPrevious.isDefined)
- assertTrue(pattern.isInstanceOf[FollowedByPattern[_, _]])
- assertTrue(previous.isInstanceOf[FollowedByPattern[_, _]])
+ assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, pattern.getQuantifier.getConsumingStrategy)
+ assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, previous.getQuantifier.getConsumingStrategy)
assertEquals(pattern.getName, "end")
assertEquals(previous.getName, "next")
@@ -78,25 +78,25 @@ class PatternTest {
}
@Test
- def testStrictContiguityWithCondition: Unit = {
+ def testStrictContiguityWithCondition(): Unit = {
val pattern = Pattern.begin[Event]("start")
.next("next")
- .where((value: Event, ctx: Context[Event]) => value.getName() == "foobar")
+ .where((value: Event, _) => value.getName == "foobar")
.next("end")
- .where((value: Event, ctx: Context[Event]) => value.getId() == 42)
+ .where((value: Event, _) => value.getId == 42)
val jPattern = JPattern.begin[Event]("start")
.next("next")
.where(new SimpleCondition[Event]() {
@throws[Exception]
def filter(value: Event): Boolean = {
- return value.getName() == "foobar"
+ value.getName == "foobar"
}
}).next("end")
.where(new SimpleCondition[Event]() {
@throws[Exception]
def filter(value: Event): Boolean = {
- return value.getId() == 42
+ value.getId == 42
}
})
@@ -120,7 +120,7 @@ class PatternTest {
}
@Test
- def testPatternWithSubtyping: Unit = {
+ def testPatternWithSubtyping(): Unit = {
val pattern = Pattern.begin[Event]("start")
.next("subevent")
.subtype(classOf[SubEvent])
@@ -150,11 +150,11 @@ class PatternTest {
}
@Test
- def testPatternWithSubtypingAndFilter: Unit = {
+ def testPatternWithSubtypingAndFilter(): Unit = {
val pattern = Pattern.begin[Event]("start")
.next("subevent")
.subtype(classOf[SubEvent])
- .where((value: SubEvent) => false)
+ .where(_ => false)
.followedBy("end")
val jpattern = JPattern.begin[Event]("start")
@@ -163,7 +163,7 @@ class PatternTest {
.where(new SimpleCondition[SubEvent]() {
@throws[Exception]
def filter(value: SubEvent): Boolean = {
- return false
+ false
}
}).followedBy("end")
@@ -178,8 +178,8 @@ class PatternTest {
assertTrue(previous.getPrevious.isDefined)
assertFalse(preprevious.getPrevious.isDefined)
- assertTrue(pattern.isInstanceOf[FollowedByPattern[_, _]])
- assertTrue(previous.getCondition().isDefined)
+ assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, pattern.getQuantifier.getConsumingStrategy)
+ assertTrue(previous.getCondition.isDefined)
assertEquals(pattern.getName, "end")
assertEquals(previous.getName, "subevent")
@@ -194,25 +194,25 @@ class PatternTest {
&& threeWayEquals(
pattern.getName,
pattern.wrappedPattern.getName,
- jPattern.getName())
+ jPattern.getName)
//check equal time windows
&& threeWayEquals(
pattern.getWindowTime.orNull,
pattern.wrappedPattern.getWindowTime,
- jPattern.getWindowTime())
+ jPattern.getWindowTime)
//check congruent class names / types
&& threeWayEquals(
pattern.getClass.getSimpleName,
pattern.wrappedPattern.getClass.getSimpleName,
- jPattern.getClass().getSimpleName())
+ jPattern.getClass.getSimpleName)
//best effort to confirm congruent filter functions
&& compareFilterFunctions(
- pattern.getCondition().orNull,
- jPattern.getCondition())
+ pattern.getCondition.orNull,
+ jPattern.getCondition)
//recursively check previous patterns
&& checkCongruentRepresentations(
pattern.getPrevious.orNull,
- jPattern.getPrevious()))
+ jPattern.getPrevious))
}
def threeWayEquals(a: AnyRef, b: AnyRef, c: AnyRef): Boolean = {
@@ -233,15 +233,15 @@ class PatternTest {
(sFilter, jFilter) match {
//matching types: and-filter; branch and recurse for inner filters
case (saf: AndCondition[_], jaf: AndCondition[_])
- => (compareFilterFunctions(saf.getLeft(), jaf.getLeft())
- && compareFilterFunctions(saf.getRight(), jaf.getRight()))
+ => (compareFilterFunctions(saf.getLeft, jaf.getLeft)
+ && compareFilterFunctions(saf.getRight, jaf.getRight))
//matching types: subtype-filter
- case (saf: SubtypeCondition[_], jaf: SubtypeCondition[_]) => true
+ case (_: SubtypeCondition[_], _: SubtypeCondition[_]) => true
//mismatch: one-sided and/subtype-filter
case (_: AndCondition[_] | _: SubtypeCondition[_], _) => false
case (_, _: AndCondition[_] | _: SubtypeCondition[_]) => false
//from here we can only check mutual presence or absence of a function
- case (s: IterativeCondition[_], j: IterativeCondition[_]) => true
+ case (_: IterativeCondition[_], _: IterativeCondition[_]) => true
case (null, null) => true
case _ => false
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index b4e0557..0ca0e14 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -20,32 +20,30 @@ package org.apache.flink.cep.nfa.compiler;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.State;
import org.apache.flink.cep.nfa.StateTransition;
import org.apache.flink.cep.nfa.StateTransitionAction;
-import org.apache.flink.cep.pattern.conditions.BooleanConditions;
-import org.apache.flink.cep.pattern.FollowedByPattern;
import org.apache.flink.cep.pattern.MalformedPatternException;
-import org.apache.flink.cep.pattern.conditions.NotCondition;
import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
+import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.conditions.BooleanConditions;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.NotCondition;
import org.apache.flink.streaming.api.windowing.time.Time;
-import javax.annotation.Nullable;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
/**
* Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a
* {@link NFAFactory}.
@@ -159,25 +157,7 @@ public class NFACompiler {
State<T> lastSink = sinkState;
while (currentPattern.getPrevious() != null) {
- checkPatternNameUniqueness();
- usedNames.add(currentPattern.getName());
-
- if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
- final State<T> looping = createLooping(lastSink);
-
- if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) {
- lastSink = createFirstMandatoryStateOfLoop(looping);
- } else if (currentPattern instanceof FollowedByPattern &&
- currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE)) {
- lastSink = createWaitingStateForZeroOrMore(looping, lastSink);
- } else {
- lastSink = looping;
- }
- } else if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.TIMES)) {
- lastSink = createTimesState(lastSink, currentPattern.getTimes());
- } else {
- lastSink = createSingletonState(lastSink);
- }
+ lastSink = convertPattern(lastSink);
currentPattern = currentPattern.getPrevious();
final Time currentWindowTime = currentPattern.getWindowTime();
@@ -190,6 +170,29 @@ public class NFACompiler {
return lastSink;
}
+ private State<T> convertPattern(final State<T> sinkState) {
+ final State<T> lastSink;
+ checkPatternNameUniqueness();
+ usedNames.add(currentPattern.getName());
+
+ final Quantifier quantifier = currentPattern.getQuantifier();
+ if (quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) {
+ final State<T> looping = createLooping(sinkState);
+
+ if (!quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+ lastSink = createFirstMandatoryStateOfLoop(looping);
+ } else {
+ lastSink = createWaitingStateForZeroOrMore(looping, sinkState);
+ }
+ } else if (quantifier.hasProperty(Quantifier.QuantifierProperty.TIMES)) {
+ lastSink = createTimesState(sinkState, currentPattern.getTimes());
+ } else {
+ lastSink = createSingletonState(sinkState);
+ }
+
+ return lastSink;
+ }
+
/**
* Creates a pair of states that enables relaxed strictness before a zeroOrMore looping state.
*
@@ -197,19 +200,21 @@ public class NFACompiler {
* @param lastSink the state that the looping one points to
* @return the newly created state
*/
+ @SuppressWarnings("unchecked")
private State<T> createWaitingStateForZeroOrMore(final State<T> loopingState, final State<T> lastSink) {
- final State<T> followByState = createNormalState();
- final State<T> followByStateWithoutProceed = createNormalState();
-
final IterativeCondition<T> currentFunction = (IterativeCondition<T>)currentPattern.getCondition();
- final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
+ final State<T> followByState = createNormalState();
followByState.addProceed(lastSink, BooleanConditions.<T>trueFunction());
- followByState.addIgnore(followByStateWithoutProceed, ignoreFunction);
followByState.addTake(loopingState, currentFunction);
- followByStateWithoutProceed.addIgnore(ignoreFunction);
- followByStateWithoutProceed.addTake(loopingState, currentFunction);
+ final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
+ if (ignoreFunction != null) {
+ final State<T> followByStateWithoutProceed = createNormalState();
+ followByState.addIgnore(followByStateWithoutProceed, ignoreFunction);
+ followByStateWithoutProceed.addIgnore(ignoreFunction);
+ followByStateWithoutProceed.addTake(loopingState, currentFunction);
+ }
return followByState;
}
@@ -230,23 +235,7 @@ public class NFACompiler {
*/
@SuppressWarnings("unchecked")
private State<T> createStartState(State<T> sinkState) {
- checkPatternNameUniqueness();
- usedNames.add(currentPattern.getName());
-
- final State<T> beginningState;
- if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
- final State<T> loopingState = createLooping(sinkState);
- if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) {
- beginningState = createFirstMandatoryStateOfLoop(loopingState);
- } else {
- beginningState = loopingState;
- }
- } else if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.TIMES)) {
- beginningState = createTimesState(sinkState, currentPattern.getTimes());
- } else {
- beginningState = createSingletonState(sinkState);
- }
-
+ final State<T> beginningState = convertPattern(sinkState);
beginningState.makeStart();
return beginningState;
@@ -263,29 +252,26 @@ public class NFACompiler {
private State<T> createTimesState(final State<T> sinkState, int times) {
State<T> lastSink = sinkState;
for (int i = 0; i < times - 1; i++) {
- lastSink = createSingletonState(
- lastSink,
- !currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE),
- false);
+ lastSink = createSingletonState(lastSink, getInnerIgnoreCondition(currentPattern), false);
}
+ final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
+ final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
+
// we created the intermediate states in the loop, now we create the start of the loop.
- if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) {
- return createSingletonState(lastSink, currentPattern instanceof FollowedByPattern, false);
+ if (!currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+ return createSingletonState(lastSink, ignoreCondition, false);
}
- final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
- final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
-
final State<T> singletonState = createNormalState();
singletonState.addTake(lastSink, currentFilterFunction);
- singletonState.addProceed(sinkState, trueFunction);
+ singletonState.addProceed(sinkState, BooleanConditions.<T>trueFunction());
- if (currentPattern instanceof FollowedByPattern) {
+ if (ignoreCondition != null) {
State<T> ignoreState = createNormalState();
ignoreState.addTake(lastSink, currentFilterFunction);
- ignoreState.addIgnore(trueFunction);
- singletonState.addIgnore(ignoreState, trueFunction);
+ ignoreState.addIgnore(ignoreCondition);
+ singletonState.addIgnore(ignoreState, ignoreCondition);
}
return singletonState;
}
@@ -300,8 +286,10 @@ public class NFACompiler {
*/
@SuppressWarnings("unchecked")
private State<T> createSingletonState(final State<T> sinkState) {
- return createSingletonState(sinkState, currentPattern instanceof FollowedByPattern,
- currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL));
+ return createSingletonState(
+ sinkState,
+ getIgnoreCondition(currentPattern),
+ currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL));
}
/**
@@ -309,12 +297,12 @@ public class NFACompiler {
* of a similar state without the PROCEED edge, so that for each PROCEED transition branches
* in computation state graph can be created only once.
*
- * @param addIgnore if any IGNORE should be added
+ * @param ignoreCondition condition that should be applied to IGNORE transition
* @param sinkState state that the state being converted should point to
* @return the created state
*/
@SuppressWarnings("unchecked")
- private State<T> createSingletonState(final State<T> sinkState, boolean addIgnore, boolean isOptional) {
+ private State<T> createSingletonState(final State<T> sinkState, final IterativeCondition<T> ignoreCondition, final boolean isOptional) {
final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
@@ -325,7 +313,7 @@ public class NFACompiler {
singletonState.addProceed(sinkState, trueFunction);
}
- if (addIgnore) {
+ if (ignoreCondition != null) {
final State<T> ignoreState;
if (isOptional) {
ignoreState = createNormalState();
@@ -333,7 +321,7 @@ public class NFACompiler {
} else {
ignoreState = singletonState;
}
- singletonState.addIgnore(ignoreState, getIgnoreCondition(currentPattern));
+ singletonState.addIgnore(ignoreState, ignoreCondition);
}
return singletonState;
}
@@ -352,8 +340,8 @@ public class NFACompiler {
final State<T> firstState = createNormalState();
firstState.addTake(sinkState, currentFilterFunction);
- if (currentPattern instanceof FollowedByPattern) {
- final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
+ final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
+ if (ignoreCondition != null) {
firstState.addIgnore(ignoreCondition);
}
return firstState;
@@ -369,18 +357,16 @@ public class NFACompiler {
*/
@SuppressWarnings("unchecked")
private State<T> createLooping(final State<T> sinkState) {
-
- final State<T> loopingState = createNormalState();
final IterativeCondition<T> filterFunction = (IterativeCondition<T>) currentPattern.getCondition();
+ final IterativeCondition<T> ignoreCondition = getInnerIgnoreCondition(currentPattern);
final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
+ final State<T> loopingState = createNormalState();
loopingState.addProceed(sinkState, trueFunction);
loopingState.addTake(filterFunction);
- if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE)) {
- final State<T> ignoreState = createNormalState();
-
- final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
+ if (ignoreCondition != null) {
+ final State<T> ignoreState = createNormalState();
ignoreState.addTake(loopingState, filterFunction);
ignoreState.addIgnore(ignoreCondition);
loopingState.addIgnore(ignoreState, ignoreCondition);
@@ -403,15 +389,37 @@ public class NFACompiler {
/**
* @return The {@link IterativeCondition condition} for the {@code IGNORE} edge
- * that corresponds to the specified {@link Pattern}. If the pattern is
- * {@link QuantifierProperty#EAGER}, the negated user-specified condition is
- * returned. In other case, a condition that always evaluated to {@code true} is
- * returned.
+ * that corresponds to the specified {@link Pattern}. It is applicable only for inner states of a complex
+ * state like looping or times.
*/
+ @SuppressWarnings("unchecked")
+ private IterativeCondition<T> getInnerIgnoreCondition(Pattern<T, ?> pattern) {
+ switch (pattern.getQuantifier().getInnerConsumingStrategy()) {
+ case STRICT:
+ return null;
+ case SKIP_TILL_NEXT:
+ return new NotCondition<>((IterativeCondition<T>) pattern.getCondition());
+ case SKIP_TILL_ANY:
+ return BooleanConditions.trueFunction();
+ }
+ return null;
+ }
+
+ /**
+ * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge
+ * that corresponds to the specified {@link Pattern}. For more on strategy see {@link Quantifier}
+ */
+ @SuppressWarnings("unchecked")
private IterativeCondition<T> getIgnoreCondition(Pattern<T, ?> pattern) {
- return pattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)
- ? new NotCondition<>((IterativeCondition<T>) pattern.getCondition())
- : BooleanConditions.<T>trueFunction();
+ switch (pattern.getQuantifier().getConsumingStrategy()) {
+ case STRICT:
+ return null;
+ case SKIP_TILL_NEXT:
+ return new NotCondition<>((IterativeCondition<T>) pattern.getCondition());
+ case SKIP_TILL_ANY:
+ return BooleanConditions.trueFunction();
+ }
+ return null;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
deleted file mode 100644
index 266451c..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.pattern;
-
-/**
- * Pattern operator which signifies that the there is a non-strict temporal contiguity between
- * itself and its preceding pattern operator. This means that there might be events in between
- * two matching events. These events are then simply ignored.
- *
- * @param <T> Base type of the events
- * @param <F> Subtype of T to which the operator is currently constrained
- */
-public class FollowedByPattern<T, F extends T> extends Pattern<T, F> {
- FollowedByPattern(final String name, Pattern<T, ?> previous) {
- super(name, previous);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 3ce0c73..cef0f85 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.pattern;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy;
import org.apache.flink.cep.pattern.conditions.AndCondition;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.OrCondition;
@@ -56,8 +57,8 @@ public class Pattern<T, F extends T> {
/** Window length in which the pattern match has to occur. */
private Time windowTime;
- /** A quantifier for the pattern. By default set to {@link Quantifier#ONE()}. */
- private Quantifier quantifier = Quantifier.ONE();
+ /** A quantifier for the pattern. By default set to {@link Quantifier#ONE(ConsumingStrategy)}. */
+ private Quantifier quantifier = Quantifier.ONE(ConsumingStrategy.STRICT);
/**
* Applicable to a {@code times} pattern, and holds
@@ -70,6 +71,15 @@ public class Pattern<T, F extends T> {
this.previous = previous;
}
+ protected Pattern(
+ final String name,
+ final Pattern<T, ? extends T> previous,
+ final ConsumingStrategy consumingStrategy) {
+ this.name = name;
+ this.previous = previous;
+ this.quantifier = Quantifier.ONE(consumingStrategy);
+ }
+
public Pattern<T, ? extends T> getPrevious() {
return previous;
}
@@ -195,7 +205,19 @@ public class Pattern<T, F extends T> {
* @return A new pattern which is appended to this one
*/
public Pattern<T, T> next(final String name) {
- return new Pattern<T, T>(name, this);
+ return new Pattern<T, T>(name, this, ConsumingStrategy.STRICT);
+ }
+
+ /**
+ * Appends a new pattern to the existing one. The new pattern enforces non-strict
+ * temporal contiguity. This means that a matching event of this pattern and the
+ * preceding matching event might be interleaved with other events which are ignored.
+ *
+ * @param name Name of the new pattern
+ * @return A new pattern which is appended to this one
+ */
+ public Pattern<T, T> followedBy(final String name) {
+ return new Pattern<>(name, this, ConsumingStrategy.SKIP_TILL_NEXT);
}
/**
@@ -206,8 +228,8 @@ public class Pattern<T, F extends T> {
* @param name Name of the new pattern
* @return A new pattern which is appended to this one
*/
- public FollowedByPattern<T, T> followedBy(final String name) {
- return new FollowedByPattern<T, T>(name, this);
+ public Pattern<T, T> followedByAny(final String name) {
+ return new Pattern<>(name, this, ConsumingStrategy.SKIP_TILL_ANY);
}
/**
@@ -232,12 +254,12 @@ public class Pattern<T, F extends T> {
* {@code A1 A2 B} appears, this will generate patterns:
* {@code A1 B} and {@code A1 A2 B}. See also {@link #allowCombinations()}.
*
- * @return The same pattern with a {@link Quantifier#ONE_OR_MORE()} quantifier applied.
+ * @return The same pattern with a {@link Quantifier#ONE_OR_MORE(ConsumingStrategy)} quantifier applied.
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
public Pattern<T, F> oneOrMore() {
checkIfQuantifierApplied();
- this.quantifier = Quantifier.ONE_OR_MORE();
+ this.quantifier = Quantifier.ONE_OR_MORE(quantifier.getConsumingStrategy());
return this;
}
@@ -252,14 +274,14 @@ public class Pattern<T, F extends T> {
public Pattern<T, F> times(int times) {
checkIfQuantifierApplied();
Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
- this.quantifier = Quantifier.TIMES();
+ this.quantifier = Quantifier.TIMES(quantifier.getConsumingStrategy());
this.times = times;
return this;
}
/**
- * Applicable only to {@link Quantifier#ONE_OR_MORE()} and {@link Quantifier#TIMES()} patterns,
- * this option allows more flexibility to the matching events.
+ * Applicable only to {@link Quantifier#ONE_OR_MORE(ConsumingStrategy)} and
+ * {@link Quantifier#TIMES(ConsumingStrategy)} patterns, this option allows more flexibility to the matching events.
*
* <p>If {@code allowCombinations()} is not applied for a
* pattern {@code A.oneOrMore().followedBy(B)} and a sequence of events
http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index 0332ed4..b0f882c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -24,48 +24,62 @@ public class Quantifier {
private final EnumSet<QuantifierProperty> properties;
- private Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) {
+ private final ConsumingStrategy consumingStrategy;
+
+ private ConsumingStrategy innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_NEXT;
+
+ private Quantifier(
+ final ConsumingStrategy consumingStrategy,
+ final QuantifierProperty first,
+ final QuantifierProperty... rest) {
this.properties = EnumSet.of(first, rest);
+ this.consumingStrategy = consumingStrategy;
}
- public static Quantifier ONE() {
- return new Quantifier(QuantifierProperty.SINGLE);
+ public static Quantifier ONE(final ConsumingStrategy consumingStrategy) {
+ return new Quantifier(consumingStrategy, QuantifierProperty.SINGLE);
}
- public static Quantifier ONE_OR_MORE() {
- return new Quantifier(QuantifierProperty.LOOPING, QuantifierProperty.EAGER);
+ public static Quantifier ONE_OR_MORE(final ConsumingStrategy consumingStrategy) {
+ return new Quantifier(consumingStrategy, QuantifierProperty.LOOPING);
}
- public static Quantifier TIMES() {
- return new Quantifier(QuantifierProperty.TIMES, QuantifierProperty.EAGER);
+ public static Quantifier TIMES(final ConsumingStrategy consumingStrategy) {
+ return new Quantifier(consumingStrategy, QuantifierProperty.TIMES);
}
public boolean hasProperty(QuantifierProperty property) {
return properties.contains(property);
}
- public void combinations() {
- if (!hasProperty(QuantifierProperty.SINGLE) && !hasProperty(Quantifier.QuantifierProperty.EAGER)) {
- throw new MalformedPatternException("Combinations already allowed!");
- }
+ public ConsumingStrategy getConsumingStrategy() {
+ return consumingStrategy;
+ }
- if (hasProperty(Quantifier.QuantifierProperty.LOOPING) || hasProperty(Quantifier.QuantifierProperty.TIMES)) {
- properties.remove(Quantifier.QuantifierProperty.EAGER);
- } else {
- throw new MalformedPatternException("Combinations not applicable to " + this + "!");
+ public ConsumingStrategy getInnerConsumingStrategy() {
+ return innerConsumingStrategy;
+ }
+
+ private static void checkPattern(boolean condition, Object errorMessage) {
+ if (!condition) {
+ throw new MalformedPatternException(String.valueOf(errorMessage));
}
}
+ public void combinations() {
+ checkPattern(!hasProperty(QuantifierProperty.SINGLE), "Combinations not applicable to " + this + "!");
+ checkPattern(innerConsumingStrategy != ConsumingStrategy.STRICT, "You can apply apply either combinations or consecutive, not both!");
+ checkPattern(innerConsumingStrategy != ConsumingStrategy.SKIP_TILL_ANY, "Combinations already applied!");
+
+ innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_ANY;
+ }
+
public void consecutive() {
- if (!hasProperty(QuantifierProperty.SINGLE) && hasProperty(Quantifier.QuantifierProperty.CONSECUTIVE)) {
- throw new MalformedPatternException("Strict continuity already applied!");
- }
+ checkPattern(hasProperty(QuantifierProperty.LOOPING) || hasProperty(QuantifierProperty.TIMES), "Combinations not applicable to " + this + "!");
+ checkPattern(innerConsumingStrategy != ConsumingStrategy.SKIP_TILL_ANY, "You can apply apply either combinations or consecutive, not both!");
+ checkPattern(innerConsumingStrategy != ConsumingStrategy.STRICT, "Combinations already applied!");
- if (hasProperty(Quantifier.QuantifierProperty.LOOPING) || hasProperty(Quantifier.QuantifierProperty.TIMES)) {
- properties.add(Quantifier.QuantifierProperty.CONSECUTIVE);
- } else {
- throw new MalformedPatternException("Strict continuity not applicable to " + this + "!");
- }
+ innerConsumingStrategy = ConsumingStrategy.STRICT;
}
public void optional() {
@@ -76,13 +90,21 @@ public class Quantifier {
}
@Override
- public boolean equals(Object obj) {
- return obj instanceof Quantifier && properties.equals(((Quantifier)obj).properties);
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Quantifier that = (Quantifier) o;
+ return Objects.equals(properties, that.properties) &&
+ consumingStrategy == that.consumingStrategy;
}
@Override
public int hashCode() {
- return Objects.hashCode(properties);
+ return Objects.hash(properties, consumingStrategy);
}
/**
@@ -92,9 +114,13 @@ public class Quantifier {
SINGLE,
LOOPING,
TIMES,
- EAGER,
- CONSECUTIVE,
OPTIONAL
}
+ public enum ConsumingStrategy {
+ STRICT,
+ SKIP_TILL_NEXT,
+ SKIP_TILL_ANY
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 3a32175..f62c686 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -96,7 +96,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
return value.getName().equals("start");
}
})
- .followedBy("middle").subtype(SubEvent.class).where(
+ .followedByAny("middle").subtype(SubEvent.class).where(
new SimpleCondition<SubEvent>() {
@Override
@@ -105,7 +105,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
}
}
)
- .followedBy("end").where(new SimpleCondition<Event>() {
+ .followedByAny("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
@@ -171,7 +171,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
return value.getName().equals("start");
}
})
- .followedBy("middle").subtype(SubEvent.class).where(
+ .followedByAny("middle").subtype(SubEvent.class).where(
new SimpleCondition<SubEvent>() {
@Override
@@ -180,7 +180,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
}
}
)
- .followedBy("end").where(new SimpleCondition<Event>() {
+ .followedByAny("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
@@ -250,13 +250,13 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
- }).followedBy("middle").where(new SimpleCondition<Event>() {
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("middle");
}
- }).followedBy("end").where(new SimpleCondition<Event>() {
+ }).followedByAny("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
@@ -339,13 +339,13 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
- }).followedBy("middle").where(new SimpleCondition<Event>() {
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("middle");
}
- }).followedBy("end").where(new SimpleCondition<Event>() {
+ }).followedByAny("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
@@ -416,7 +416,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
DataStream<Integer> input = env.fromElements(1, 2);
- Pattern<Integer, ?> pattern = Pattern.<Integer>begin("start").followedBy("end").within(Time.days(1));
+ Pattern<Integer, ?> pattern = Pattern.<Integer>begin("start").followedByAny("end").within(Time.days(1));
DataStream<Integer> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Integer, Integer>() {
@Override
@@ -470,13 +470,13 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
- }).followedBy("middle").where(new SimpleCondition<Event>() {
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("middle");
}
- }).followedBy("end").where(new SimpleCondition<Event>() {
+ }).followedByAny("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
@@ -538,7 +538,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
return value.getName().equals("start");
}
})
- .followedBy("middle")
+ .followedByAny("middle")
.where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
@@ -551,7 +551,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
return value.getPrice() == 5.0;
}
})
- .followedBy("end").where(new SimpleCondition<Event>() {
+ .followedByAny("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
@@ -623,13 +623,13 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
- }).followedBy("middle").where(new SimpleCondition<Event>() {
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("middle");
}
- }).followedBy("end").where(new SimpleCondition<Event>() {
+ }).followedByAny("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {