You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2017/06/20 06:41:53 UTC
flink git commit: [FLINK-6904] [cep] Support for quantifier range to
CEP's pattern API
Repository: flink
Updated Branches:
refs/heads/finish-pr-4121 [created] 8835da996
[FLINK-6904] [cep] Support for quantifier range to CEP's pattern API
This closes #4121
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8835da99
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8835da99
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8835da99
Branch: refs/heads/finish-pr-4121
Commit: 8835da996f61222ef7291d73b5160e0405d81b16
Parents: 6cf6cb8
Author: Dian Fu <fu...@alibaba-inc.com>
Authored: Wed Jun 14 11:19:41 2017 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Tue Jun 20 08:34:37 2017 +0200
----------------------------------------------------------------------
.../flink/cep/scala/pattern/Pattern.scala | 13 +
.../flink/cep/nfa/compiler/NFACompiler.java | 49 +-
.../org/apache/flink/cep/pattern/Pattern.java | 27 +-
.../apache/flink/cep/pattern/Quantifier.java | 33 ++
.../org/apache/flink/cep/nfa/NFAITCase.java | 1 -
.../apache/flink/cep/nfa/TimesRangeITCase.java | 564 +++++++++++++++++++
6 files changed, 657 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8835da99/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 3a30836..270b2f5 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -316,6 +316,19 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
}
/**
+ * Specifies that the pattern can occur between from and to times.
+ *
+ * @param from number of times matching event must appear at least
+ * @param to number of times matching event must appear at most
+ * @return The same pattern with the number of times range applied
+ * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
+ */
+ def times(from: Int, to: Int): Pattern[T, F] = {
+ jPattern.times(from, to)
+ this
+ }
+
+ /**
* Applicable only to [[Quantifier.oneOrMore()]] and [[Quantifier.times()]] patterns,
* this option allows more flexibility to the matching events.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/8835da99/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index ce42acd..b5a437b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -28,6 +28,7 @@ import org.apache.flink.cep.nfa.StateTransitionAction;
import org.apache.flink.cep.pattern.MalformedPatternException;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.Quantifier.Times;
import org.apache.flink.cep.pattern.conditions.BooleanConditions;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.NotCondition;
@@ -372,33 +373,23 @@ public class NFACompiler {
* @param times number of times the state should be copied
* @return the first state of the "complex" state, next state should point to it
*/
- private State<T> createTimesState(final State<T> sinkState, int times) {
- State<T> lastSink = copyWithoutTransitiveNots(sinkState);
- for (int i = 0; i < times - 1; i++) {
- lastSink = createSingletonState(lastSink, getInnerIgnoreCondition(currentPattern), false);
+ private State<T> createTimesState(final State<T> sinkState, Times times) {
+ State<T> lastSink = sinkState;
+ final IterativeCondition<T> innerIgnoreCondition = getInnerIgnoreCondition(currentPattern);
+ for (int i = times.getFrom(); i < times.getTo(); i++) {
+ lastSink = createSingletonState(lastSink, sinkState, innerIgnoreCondition, true);
addStopStateToLooping(lastSink);
}
-
- final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
- final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
-
- // we created the intermediate states in the loop, now we create the start of the loop.
- if (!currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
- return createSingletonState(lastSink, ignoreCondition, false);
- }
-
- final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal);
- singletonState.addTake(lastSink, currentCondition);
- singletonState.addProceed(sinkState, BooleanConditions.<T>trueFunction());
-
- if (ignoreCondition != null) {
- State<T> ignoreState = createState(currentPattern.getName(), State.StateType.Normal);
- ignoreState.addTake(lastSink, currentCondition);
- ignoreState.addIgnore(ignoreCondition);
- singletonState.addIgnore(ignoreState, ignoreCondition);
- addStopStates(ignoreState);
+ for (int i = 0; i < times.getFrom() - 1; i++) {
+ lastSink = createSingletonState(lastSink, null, innerIgnoreCondition, false);
+ addStopStateToLooping(lastSink);
}
- return singletonState;
+ // we created the intermediate states in the loop, now we create the start of the loop.
+ return createSingletonState(
+ lastSink,
+ sinkState,
+ getIgnoreCondition(currentPattern),
+ currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL));
}
/**
@@ -413,6 +404,7 @@ public class NFACompiler {
private State<T> createSingletonState(final State<T> sinkState) {
return createSingletonState(
sinkState,
+ sinkState,
getIgnoreCondition(currentPattern),
currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL));
}
@@ -424,10 +416,15 @@ public class NFACompiler {
*
* @param ignoreCondition condition that should be applied to IGNORE transition
* @param sinkState state that the state being converted should point to
+ * @param proceedState state that the state being converted should proceed to
+ * @param isOptional whether the state being converted is optional
* @return the created state
*/
@SuppressWarnings("unchecked")
- private State<T> createSingletonState(final State<T> sinkState, final IterativeCondition<T> ignoreCondition, final boolean isOptional) {
+ private State<T> createSingletonState(final State<T> sinkState,
+ final State<T> proceedState,
+ final IterativeCondition<T> ignoreCondition,
+ final boolean isOptional) {
final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
@@ -438,7 +435,7 @@ public class NFACompiler {
if (isOptional) {
// if no element accepted the previous nots are still valid.
- singletonState.addProceed(sinkState, trueFunction);
+ singletonState.addProceed(proceedState, trueFunction);
}
if (ignoreCondition != null) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8835da99/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 2676994..8767a94 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -21,6 +21,7 @@ package org.apache.flink.cep.pattern;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy;
+import org.apache.flink.cep.pattern.Quantifier.Times;
import org.apache.flink.cep.pattern.conditions.AndCondition;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.OrCondition;
@@ -64,7 +65,7 @@ public class Pattern<T, F extends T> {
* Applicable to a {@code times} pattern, and holds
* the number of times it has to appear.
*/
- private int times;
+ private Times times;
protected Pattern(final String name, final Pattern<T, ? extends T> previous) {
this.name = name;
@@ -84,7 +85,7 @@ public class Pattern<T, F extends T> {
return previous;
}
- public int getTimes() {
+ public Times getTimes() {
return times;
}
@@ -318,7 +319,27 @@ public class Pattern<T, F extends T> {
checkIfQuantifierApplied();
Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
- this.times = times;
+ this.times = Times.of(times);
+ return this;
+ }
+
+ /**
+ * Specifies that the pattern can occur between from and to times.
+ *
+ * @param from number of times matching event must appear at least
+ * @param to number of times matching event must appear at most
+ * @return The same pattern with the number of times range applied
+ *
+ * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
+ */
+ public Pattern<T, F> times(int from, int to) {
+ checkIfNoNotPattern();
+ checkIfQuantifierApplied();
+ this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
+ if (from == 0) {
+ this.quantifier.optional();
+ }
+ this.times = Times.of(from, to);
return this;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8835da99/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index efc7cf4..504fec0 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -18,6 +18,8 @@
package org.apache.flink.cep.pattern;
+import org.apache.flink.util.Preconditions;
+
import java.util.EnumSet;
import java.util.Objects;
@@ -143,4 +145,35 @@ public class Quantifier {
NOT_NEXT
}
+ /**
+ * Describe the times this {@link Pattern} can occur.
+ */
+ public static class Times {
+ private final int from;
+ private final int to;
+
+ private Times(int from, int to) {
+ Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0.");
+ Preconditions.checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + ".");
+ Preconditions.checkArgument(from != to || from != 0, "The from and to should not be both equal to 0.");
+ this.from = from;
+ this.to = to;
+ }
+
+ public int getFrom() {
+ return from;
+ }
+
+ public int getTo() {
+ return to;
+ }
+
+ public static Times of(int from, int to) {
+ return new Times(from, to);
+ }
+
+ public static Times of(int times) {
+ return new Times(times, times);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8835da99/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 20cb482..506587b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -1904,7 +1904,6 @@ public class NFAITCase extends TestLogger {
Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
));
}
-
/////////////////////////////// Consecutive ////////////////////////////////////////
private static class ConsecutiveData {
http://git-wip-us.apache.org/repos/asf/flink/blob/8835da99/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
new file mode 100644
index 0000000..4305fa2
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
@@ -0,0 +1,564 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * Tests for {@link Pattern#times(int, int)}.
+ */
+@SuppressWarnings("unchecked")
+public class TimesRangeITCase extends TestLogger {
+
+ @Test
+ public void testTimesRange() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+ inputEvents.add(new StreamRecord<>(end1, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).next("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(1, 3).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1),
+ Lists.newArrayList(startEvent, middleEvent1, end1)
+ ));
+ }
+
+ @Test
+ public void testTimesRangeNonStrict() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(1, 3).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesRangeStrict() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(0, 3).consecutive().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesRangeStrictOptional() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(1, 3).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesRangeStrictOptional1() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).next("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(1, 3).consecutive().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesRangeNonStrictOptional1() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(1, 3).optional().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesRangeNonStrictOptional2() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(2, 3).allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesRangeNonStrictOptional3() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(2, 3).optional().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesRangeNonStrictWithNext() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).next("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesRangeNotStrictWithFollowedBy() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(2, 3).followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end)
+ ));
+ }
+
+ @Test
+ public void testTimesRangeNotStrictWithFollowedByAny() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
+ Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
+ ));
+ }
+
+ private static class ConsecutiveData {
+ private static final Event startEvent = new Event(40, "c", 1.0);
+ private static final Event middleEvent1 = new Event(41, "a", 2.0);
+ private static final Event middleEvent2 = new Event(42, "a", 3.0);
+ private static final Event middleEvent3 = new Event(43, "a", 4.0);
+ private static final Event end = new Event(44, "b", 5.0);
+
+ private ConsecutiveData() {
+ }
+ }
+}