You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/08/22 18:04:23 UTC
flink git commit: [FLINK-7123] [cep] Support timesOrMore in CEP
Repository: flink
Updated Branches:
refs/heads/master 40cec17f4 -> 9995588c8
[FLINK-7123] [cep] Support timesOrMore in CEP
This closes #4523.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9995588c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9995588c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9995588c
Branch: refs/heads/master
Commit: 9995588c83536e04aef3425757a008ba4c78dbde
Parents: 40cec17
Author: Dian Fu <fu...@alibaba-inc.com>
Authored: Fri Aug 11 11:29:01 2017 +0800
Committer: kkloudas <kk...@gmail.com>
Committed: Tue Aug 22 18:53:27 2017 +0200
----------------------------------------------------------------------
docs/dev/libs/cep.md | 37 ++
.../flink/cep/scala/pattern/Pattern.scala | 18 +-
.../flink/cep/nfa/compiler/NFACompiler.java | 68 +--
.../org/apache/flink/cep/pattern/Pattern.java | 23 +-
.../apache/flink/cep/pattern/Quantifier.java | 2 +-
.../apache/flink/cep/nfa/TimesOrMoreITCase.java | 562 +++++++++++++++++++
6 files changed, 653 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9995588c/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index fef1967..bddb9b2 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -186,6 +186,13 @@ named `start`, the following are valid quantifiers:
// expecting 0 or more occurrences
start.oneOrMore().optional();
+
+ // expecting 2 or more occurrences
+ start.timesOrMore(2);
+
+ // expecting 0, 2 or more occurrences
+ start.timesOrMore(2).optional();
+
{% endhighlight %}
</div>
@@ -208,6 +215,12 @@ named `start`, the following are valid quantifiers:
// expecting 0 or more occurrences
start.oneOrMore().optional()
+
+ // expecting 2 or more occurrences
+ start.timesOrMore(2);
+
+ // expecting 0, 2 or more occurrences
+ start.timesOrMore(2).optional();
{% endhighlight %}
</div>
</div>
@@ -477,6 +490,18 @@ pattern.oneOrMore();
{% endhighlight %}
</td>
</tr>
+ <tr>
+ <td><strong>timesOrMore(#times)</strong></td>
+ <td>
+ <p>Specifies that this pattern expects at least <strong>#times</strong> occurrences
+ of a matching event.</p>
+ <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on
+ internal contiguity see <a href="#consecutive_java">consecutive</a>.</p>
+{% highlight java %}
+pattern.timesOrMore(2);
+{% endhighlight %}
+ </td>
+ </tr>
<tr>
<td><strong>times(#ofTimes)</strong></td>
<td>
@@ -648,6 +673,18 @@ pattern.oneOrMore()
</td>
</tr>
<tr>
+ <td><strong>timesOrMore(#times)</strong></td>
+ <td>
+ <p>Specifies that this pattern expects at least <strong>#times</strong> occurrences
+ of a matching event.</p>
+ <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on
+ internal contiguity see <a href="#consecutive_scala">consecutive</a>.</p>
+{% highlight scala %}
+pattern.timesOrMore(2)
+{% endhighlight %}
+ </td>
+ </tr>
+ <tr>
<td><strong>times(#ofTimes)</strong></td>
<td>
<p>Specifies that this pattern expects an exact number of occurrences of a matching event.</p>
http://git-wip-us.apache.org/repos/asf/flink/blob/9995588c/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 5daebe0..5b41b90 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -344,7 +344,7 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
* {{{A1 A2 B}}} appears, this will generate patterns:
* {{{A1 B}}} and {{{A1 A2 B}}}. See also {{{allowCombinations()}}}.
*
- * @return The same pattern with a [[Quantifier.oneOrMore()]] quantifier applied.
+ * @return The same pattern with a [[Quantifier.looping()]] quantifier applied.
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
def oneOrMore: Pattern[T, F] = {
@@ -378,7 +378,21 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
}
/**
- * Applicable only to [[Quantifier.oneOrMore()]] and [[Quantifier.times()]] patterns,
+ * Specifies that this pattern can occur the specified times at least.
+ * This means at least the specified times and at most infinite number of events can
+ * be matched to this pattern.
+ *
+ * @return The same pattern with a { @link Quantifier#looping(ConsumingStrategy)} quantifier
+ * applied.
+ * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
+ */
+ def timesOrMore(times: Int): Pattern[T, F] = {
+ jPattern.timesOrMore(times)
+ this
+ }
+
+ /**
+ * Applicable only to [[Quantifier.looping()]] and [[Quantifier.times()]] patterns,
* this option allows more flexibility to the matching events.
*
* If {{{allowCombinations()}}} is not applied for a
http://git-wip-us.apache.org/repos/asf/flink/blob/9995588c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 9dc1837..4d4baca 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -291,13 +291,9 @@ public class NFACompiler {
final State<T> looping = createLooping(sink);
setCurrentGroupPatternFirstOfLoop(true);
- if (!quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
- lastSink = createInitMandatoryStateOfOneOrMore(looping);
- } else {
- lastSink = createInitOptionalStateOfZeroOrMore(looping, sinkState);
- }
+ lastSink = createTimesState(looping, sinkState, currentPattern.getTimes());
} else if (quantifier.hasProperty(Quantifier.QuantifierProperty.TIMES)) {
- lastSink = createTimesState(sinkState, currentPattern.getTimes());
+ lastSink = createTimesState(sinkState, sinkState, currentPattern.getTimes());
} else {
lastSink = createSingletonState(sinkState);
}
@@ -407,16 +403,26 @@ public class NFACompiler {
* same {@link IterativeCondition}.
*
* @param sinkState the state that the created state should point to
+ * @param proceedState state that the state being converted should proceed to
* @param times number of times the state should be copied
* @return the first state of the "complex" state, next state should point to it
*/
- private State<T> createTimesState(final State<T> sinkState, Times times) {
+ @SuppressWarnings("unchecked")
+ private State<T> createTimesState(final State<T> sinkState, final State<T> proceedState, Times times) {
State<T> lastSink = sinkState;
setCurrentGroupPatternFirstOfLoop(false);
- final IterativeCondition<T> takeCondition = getTakeCondition(currentPattern);
- final IterativeCondition<T> innerIgnoreCondition = getInnerIgnoreCondition(currentPattern);
+ final IterativeCondition<T> untilCondition = (IterativeCondition<T>) currentPattern.getUntilCondition();
+ final IterativeCondition<T> innerIgnoreCondition = extendWithUntilCondition(
+ getInnerIgnoreCondition(currentPattern),
+ untilCondition,
+ false);
+ final IterativeCondition<T> takeCondition = extendWithUntilCondition(
+ getTakeCondition(currentPattern),
+ untilCondition,
+ true);
+
for (int i = times.getFrom(); i < times.getTo(); i++) {
- lastSink = createSingletonState(lastSink, sinkState, takeCondition, innerIgnoreCondition, true);
+ lastSink = createSingletonState(lastSink, proceedState, takeCondition, innerIgnoreCondition, true);
addStopStateToLooping(lastSink);
}
for (int i = 0; i < times.getFrom() - 1; i++) {
@@ -427,7 +433,7 @@ public class NFACompiler {
setCurrentGroupPatternFirstOfLoop(true);
return createSingletonState(
lastSink,
- sinkState,
+ proceedState,
takeCondition,
getIgnoreCondition(currentPattern),
currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL));
@@ -656,46 +662,6 @@ public class NFACompiler {
}
/**
- * Patterns with quantifiers AT_LEAST_ONE_* are created as a pair of states: a singleton state and
- * looping state. This method creates the first of the two.
- *
- * @param sinkState the state the newly created state should point to, it should be a looping state
- * @return the newly created state
- */
- @SuppressWarnings("unchecked")
- private State<T> createInitMandatoryStateOfOneOrMore(final State<T> sinkState) {
- final IterativeCondition<T> takeCondition = extendWithUntilCondition(
- getTakeCondition(currentPattern),
- (IterativeCondition<T>) currentPattern.getUntilCondition(),
- true
- );
-
- final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
-
- return createSingletonState(sinkState, null, takeCondition, ignoreCondition, false);
- }
-
- /**
- * Creates a pair of states that enables relaxed strictness before a zeroOrMore looping state.
- *
- * @param loopingState the first state of zeroOrMore complex state
- * @param lastSink the state that the looping one points to
- * @return the newly created state
- */
- @SuppressWarnings("unchecked")
- private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState, final State<T> lastSink) {
- final IterativeCondition<T> takeCondition = extendWithUntilCondition(
- getTakeCondition(currentPattern),
- (IterativeCondition<T>) currentPattern.getUntilCondition(),
- true
- );
-
- final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
-
- return createSingletonState(loopingState, lastSink, takeCondition, ignoreFunction, true);
- }
-
- /**
* This method extends the given condition with stop(until) condition if necessary.
* The until condition needs to be applied only if both of the given conditions are not null.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/9995588c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 2ffbc41..adf1397 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -326,13 +326,14 @@ public class Pattern<T, F extends T> {
* {@code A1 A2 B} appears, this will generate patterns:
* {@code A1 B} and {@code A1 A2 B}. See also {@link #allowCombinations()}.
*
- * @return The same pattern with a {@link Quantifier#oneOrMore(ConsumingStrategy)} quantifier applied.
+ * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier applied.
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
public Pattern<T, F> oneOrMore() {
checkIfNoNotPattern();
checkIfQuantifierApplied();
- this.quantifier = Quantifier.oneOrMore(quantifier.getConsumingStrategy());
+ this.quantifier = Quantifier.looping(quantifier.getConsumingStrategy());
+ this.times = Times.of(1);
return this;
}
@@ -375,7 +376,23 @@ public class Pattern<T, F extends T> {
}
/**
- * Applicable only to {@link Quantifier#oneOrMore(ConsumingStrategy)} and
+ * Specifies that this pattern can occur the specified times at least.
+ * This means at least the specified times and at most infinite number of events can
+ * be matched to this pattern.
+ *
+ * @return The same pattern with a {@link Quantifier#looping(ConsumingStrategy)} quantifier applied.
+ * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
+ */
+ public Pattern<T, F> timesOrMore(int times) {
+ checkIfNoNotPattern();
+ checkIfQuantifierApplied();
+ this.quantifier = Quantifier.looping(quantifier.getConsumingStrategy());
+ this.times = Times.of(times);
+ return this;
+ }
+
+ /**
+ * Applicable only to {@link Quantifier#looping(ConsumingStrategy)} and
* {@link Quantifier#times(ConsumingStrategy)} patterns, this option allows more flexibility to the matching events.
*
* <p>If {@code allowCombinations()} is not applied for a
http://git-wip-us.apache.org/repos/asf/flink/blob/9995588c/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index 9192a13..2136706 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -55,7 +55,7 @@ public class Quantifier {
return new Quantifier(consumingStrategy, QuantifierProperty.SINGLE);
}
- public static Quantifier oneOrMore(final ConsumingStrategy consumingStrategy) {
+ public static Quantifier looping(final ConsumingStrategy consumingStrategy) {
return new Quantifier(consumingStrategy, QuantifierProperty.LOOPING);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9995588c/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
new file mode 100644
index 0000000..4e540dd
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * Tests for {@link Pattern#timesOrMore(int)}.
+ */
+public class TimesOrMoreITCase extends TestLogger {
+ @Test
+ public void testTimesOrMore() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+ inputEvents.add(new StreamRecord<>(end1, 6));
+
+ // c a{2,} b
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).timesOrMore(2).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1)
+ ));
+ }
+
+ @Test
+ public void testTimesOrMoreNonStrict() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ // c a{2,} b
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).timesOrMore(2).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesOrMoreStrict() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ // c a{2,} b
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(2).consecutive().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesOrMoreStrictOptional() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ // c a{2,} b
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).timesOrMore(2).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesOrMoreStrictOptional2() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ // c a{2,}, b
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).next("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).timesOrMore(2).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesOrMoreNonStrictOptional() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ // c a{2,} b
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).timesOrMore(2).optional().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesOrMoreNonStrictOptional2() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ // c a{2,} b
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).timesOrMore(2).allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesOrMoreNonStrictOptional3() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ // c a{2,} b
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).timesOrMore(2).optional().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesOrMoreNonStrictWithNext() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ // c a{2,} b
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).next("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).timesOrMore(2).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesOrMoreNotStrictWithFollowedBy() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ // c a{2,} b
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).timesOrMore(2).followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesOrMoreNotStrictWithFollowedByAny() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ // c a{2,} b
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).timesOrMore(2).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+ ));
+ }
+
+ private static class ConsecutiveData {
+ private static final Event startEvent = new Event(40, "c", 1.0);
+ private static final Event middleEvent1 = new Event(41, "a", 2.0);
+ private static final Event middleEvent2 = new Event(42, "a", 3.0);
+ private static final Event middleEvent3 = new Event(43, "a", 4.0);
+ private static final Event end = new Event(44, "b", 5.0);
+
+ private ConsecutiveData() {
+ }
+ }
+}