You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/05/05 12:02:01 UTC
[2/2] flink git commit: [FLINK-3320] Add NOT pattern support to CEP's
pattern API
[FLINK-3320] Add NOT pattern support to CEP's pattern API
NOT patterns are not yet supported when an OPTIONAL
pattern directly preceeds a NOT. In these cases, an
exception is thrown that proposes an alternative
(but not the most efficient) way to support these
patterns.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5795ebe1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5795ebe1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5795ebe1
Branch: refs/heads/master
Commit: 5795ebe187d6e6dce2a52ec2d245071d101437e7
Parents: d7364ff
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Fri Apr 28 15:24:37 2017 +0200
Committer: kl0u <kk...@gmail.com>
Committed: Fri May 5 13:59:36 2017 +0200
----------------------------------------------------------------------
.../flink/cep/scala/pattern/Pattern.scala | 24 +
.../apache/flink/cep/nfa/ComputationState.java | 4 +
.../main/java/org/apache/flink/cep/nfa/NFA.java | 62 +-
.../java/org/apache/flink/cep/nfa/State.java | 13 +-
.../flink/cep/nfa/compiler/NFACompiler.java | 321 ++++--
.../AbstractKeyedCEPPatternOperator.java | 1 -
.../org/apache/flink/cep/pattern/Pattern.java | 42 +-
.../apache/flink/cep/pattern/Quantifier.java | 12 +-
.../org/apache/flink/cep/nfa/NFAITCase.java | 1056 +++++++++++++++++-
.../flink/cep/nfa/compiler/NFACompilerTest.java | 35 +
.../apache/flink/cep/pattern/PatternTest.java | 76 +-
11 files changed, 1497 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 65b7ab0..fe7a30c 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -201,6 +201,17 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
}
/**
+ * Appends a new pattern to the existing one. The new pattern enforces that there is no event
+ * matching this pattern right after the preceding matched event.
+ *
+ * @param name Name of the new pattern
+ * @return A new pattern which is appended to this one
+ */
+ def notNext(name: String): Pattern[T, T] = {
+ Pattern[T, T](jPattern.notNext(name))
+ }
+
+ /**
* Appends a new pattern to the existing one. The new pattern enforces non-strict
* temporal contiguity. This means that a matching event of this pattern and the
* preceding matching event might be interleaved with other events which are ignored.
@@ -213,6 +224,19 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
}
/**
+ * Appends a new pattern to the existing one. The new pattern enforces that there is no event
+ * matching this pattern between the preceding pattern and succeeding this one.
+ *
+ * NOTE: There has to be other pattern after this one.
+ *
+ * @param name Name of the new pattern
+ * @return A new pattern which is appended to this one
+ */
+ def notFollowedBy(name : String) {
+ Pattern[T, T](jPattern.notFollowedBy(name))
+ }
+
+ /**
* Appends a new pattern to the existing one. The new pattern enforces non-strict
* temporal contiguity. This means that a matching event of this pattern and the
* preceding matching event might be interleaved with other events which are ignored.
http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 80227fc..08b9b78 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -127,6 +127,10 @@ public class ComputationState<T> {
return new ComputationState<>(nfa, currentState, previousState, event, timestamp, version, startTimestamp);
}
+ public boolean isStopState() {
+ return state.isStop();
+ }
+
/**
* The context used when evaluating this computation state.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 98c1fc9..b8c4e65 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -21,21 +21,6 @@ package org.apache.flink.cep.nfa;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
import com.google.common.collect.LinkedHashMultimap;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
-import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
-import org.apache.flink.cep.NonDuplicatingTypeSerializer;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.cep.pattern.conditions.IterativeCondition;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -58,6 +43,20 @@ import java.util.Set;
import java.util.Stack;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
+import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
+import org.apache.flink.cep.NonDuplicatingTypeSerializer;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
/**
* Non-deterministic finite automaton implementation.
@@ -180,6 +179,9 @@ public class NFA<T> implements Serializable {
* resulting event sequences are returned. If computations time out and timeout handling is
* activated, then the timed out event patterns are returned.
*
+ * <p>If computations reach a stop state, the path forward is discarded and currently constructed path is returned
+ * with the element that resulted in the stop state.
+ *
* @param event The current event to be processed or null if only pruning shall be done
* @param timestamp The timestamp of the current event
* @return Tuple of the collection of matched patterns (e.g. the result of computations which have
@@ -222,7 +224,12 @@ public class NFA<T> implements Serializable {
newComputationStates = Collections.singleton(computationState);
}
- for (ComputationState<T> newComputationState: newComputationStates) {
+
+ //delay adding new computation states in case a stop state is reached and we discard the path.
+ final Collection<ComputationState<T>> statesToRetain = new ArrayList<>();
+ //if stop state reached in this path
+ boolean shouldDiscardPath = false;
+ for (final ComputationState<T> newComputationState: newComputationStates) {
if (newComputationState.isFinalState()) {
// we've reached a final state and can thus retrieve the matching event sequence
Collection<Map<String, T>> matches = extractPatternMatches(newComputationState);
@@ -233,11 +240,32 @@ public class NFA<T> implements Serializable {
newComputationState.getPreviousState().getName(),
newComputationState.getEvent(),
newComputationState.getTimestamp());
+ } else if (newComputationState.isStopState()) {
+ //reached stop state. release entry for the stop state
+ shouldDiscardPath = true;
+ stringSharedBuffer.release(
+ newComputationState.getPreviousState().getName(),
+ newComputationState.getEvent(),
+ newComputationState.getTimestamp());
} else {
// add new computation state; it will be processed once the next event arrives
- computationStates.add(newComputationState);
+ statesToRetain.add(newComputationState);
}
}
+
+ if (shouldDiscardPath) {
+ // a stop state was reached in this branch. release branch which results in removing previous event from
+ // the buffer
+ for (final ComputationState<T> state : statesToRetain) {
+ stringSharedBuffer.release(
+ state.getPreviousState().getName(),
+ state.getEvent(),
+ state.getTimestamp());
+ }
+ } else {
+ computationStates.addAll(statesToRetain);
+ }
+
}
// prune shared buffer based on window length
http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 2503ffd..275266b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -51,6 +51,10 @@ public class State<T> implements Serializable {
stateTransitions = new ArrayList<>();
}
+ public StateType getStateType() {
+ return stateType;
+ }
+
public boolean isFinal() {
return stateType == StateType.Final;
}
@@ -69,7 +73,7 @@ public class State<T> implements Serializable {
this.stateType = StateType.Start;
}
- private void addStateTransition(
+ public void addStateTransition(
final StateTransitionAction action,
final State<T> targetState,
final IterativeCondition<T> condition) {
@@ -130,13 +134,18 @@ public class State<T> implements Serializable {
return Objects.hash(name, stateType, stateTransitions);
}
+ public boolean isStop() {
+ return stateType == StateType.Stop;
+ }
+
/**
* Set of valid state types.
*/
public enum StateType {
Start, // the state is a starting state for the NFA
Final, // the state is a final state for the NFA
- Normal // the state is neither a start nor a final state
+ Normal, // the state is neither a start nor a final state
+ Stop
}
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 0ca0e14..39c18b9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -32,6 +32,7 @@ import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.State;
import org.apache.flink.cep.nfa.StateTransition;
@@ -101,13 +102,15 @@ public class NFACompiler {
*
* @param <T>
*/
- private static class NFAFactoryCompiler<T> {
+ static class NFAFactoryCompiler<T> {
private final Set<String> usedNames = new HashSet<>();
+ private final Map<String, State<T>> stopStates = new HashMap<>();
private final List<State<T>> states = new ArrayList<>();
private long windowTime = 0;
private Pattern<T, ?> currentPattern;
+ private Pattern<T, ?> followingPattern;
NFAFactoryCompiler(final Pattern<T, ?> pattern) {
this.currentPattern = pattern;
@@ -118,6 +121,10 @@ public class NFACompiler {
* multiple NFAs.
*/
void compileFactory() {
+ if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
+ throw new MalformedPatternException("NotFollowedBy is not supported as a last part of a Pattern!");
+ }
+
// we're traversing the pattern from the end to the beginning --> the first state is the final state
State<T> sinkState = createEndingState();
// add all the normal states
@@ -135,13 +142,45 @@ public class NFACompiler {
}
/**
+ * Retrieves list of conditions resulting in Stop state and names of the corresponding NOT patterns.
+ *
+ * <p>A current not condition can be produced in two cases:
+ * <ol>
+ * <li>the previous pattern is a {@link Quantifier.ConsumingStrategy#NOT_FOLLOW}</li>
+ * <li>exists a backward path of {@link Quantifier.QuantifierProperty#OPTIONAL} patterns to
+ * {@link Quantifier.ConsumingStrategy#NOT_FOLLOW}</li>
+ * </ol>
+ *
+ * <p><b>WARNING:</b> for more info on the second case see: {@link NFAFactoryCompiler#copyWithoutTransitiveNots(State)}
+ *
+ * @return list of not conditions with corresponding names
+ */
+ private List<Tuple2<IterativeCondition<T>, String>> getCurrentNotCondition() {
+ List<Tuple2<IterativeCondition<T>, String>> notConditions = new ArrayList<>();
+
+ Pattern<T, ? extends T> previousPattern = currentPattern;
+ while (previousPattern.getPrevious() != null && (
+ previousPattern.getPrevious().getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL) ||
+ previousPattern.getPrevious().getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW)) {
+
+ previousPattern = previousPattern.getPrevious();
+
+ if (previousPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
+ final IterativeCondition<T> notCondition = (IterativeCondition<T>) previousPattern.getCondition();
+ notConditions.add(Tuple2.of(notCondition, previousPattern.getName()));
+ }
+ }
+ return notConditions;
+ }
+
+ /**
* Creates the dummy Final {@link State} of the NFA graph.
* @return dummy Final state
*/
private State<T> createEndingState() {
+ checkPatternNameUniqueness(ENDING_STATE_NAME);
State<T> endState = new State<>(ENDING_STATE_NAME, State.StateType.Final);
states.add(endState);
- usedNames.add(ENDING_STATE_NAME);
windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L;
return endState;
@@ -154,10 +193,30 @@ public class NFACompiler {
* @return the next state after Start in the resulting graph
*/
private State<T> createMiddleStates(final State<T> sinkState) {
-
State<T> lastSink = sinkState;
while (currentPattern.getPrevious() != null) {
- lastSink = convertPattern(lastSink);
+
+ if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
+ //skip notFollow patterns, they are converted into edge conditions
+ } else if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_NEXT) {
+ final State<T> notNext = createNormalState();
+ final IterativeCondition<T> notCondition = (IterativeCondition<T>) currentPattern.getCondition();
+ final State<T> stopState = createStopState(notCondition, currentPattern.getName());
+
+ if (lastSink.isFinal()) {
+ //so that the proceed to final is not fired
+ notNext.addIgnore(lastSink, new NotCondition<>(notCondition));
+ } else {
+ notNext.addProceed(lastSink, new NotCondition<>(notCondition));
+ }
+ notNext.addProceed(stopState, notCondition);
+ lastSink = notNext;
+ } else {
+ lastSink = convertPattern(lastSink);
+ }
+
+ // we traverse the pattern graph backwards
+ followingPattern = currentPattern;
currentPattern = currentPattern.getPrevious();
final Time currentWindowTime = currentPattern.getWindowTime();
@@ -166,79 +225,152 @@ public class NFACompiler {
windowTime = currentWindowTime.toMilliseconds();
}
}
-
return lastSink;
}
+ /**
+ * Creates the Start {@link State} of the resulting NFA graph.
+ *
+ * @param sinkState the state that Start state should point to (always first state of middle states)
+ * @return created state
+ */
+ @SuppressWarnings("unchecked")
+ private State<T> createStartState(State<T> sinkState) {
+ final State<T> beginningState = convertPattern(sinkState);
+ beginningState.makeStart();
+ return beginningState;
+ }
+
private State<T> convertPattern(final State<T> sinkState) {
final State<T> lastSink;
- checkPatternNameUniqueness();
- usedNames.add(currentPattern.getName());
+ checkPatternNameUniqueness(currentPattern.getName());
final Quantifier quantifier = currentPattern.getQuantifier();
if (quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) {
- final State<T> looping = createLooping(sinkState);
+
+ // if loop has started then all notPatterns previous to the optional states are no longer valid
+ final State<T> sink = copyWithoutTransitiveNots(sinkState);
+ final State<T> looping = createLooping(sink);
if (!quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
- lastSink = createFirstMandatoryStateOfLoop(looping);
+ lastSink = createInitMandatoryStateOfOneOrMore(looping);
} else {
- lastSink = createWaitingStateForZeroOrMore(looping, sinkState);
+ lastSink = createInitOptionalStateOfZeroOrMore(looping, sinkState);
}
} else if (quantifier.hasProperty(Quantifier.QuantifierProperty.TIMES)) {
lastSink = createTimesState(sinkState, currentPattern.getTimes());
} else {
lastSink = createSingletonState(sinkState);
}
+ addStopStates(lastSink);
return lastSink;
}
/**
- * Creates a pair of states that enables relaxed strictness before a zeroOrMore looping state.
+ * Creates a state with {@link State.StateType#Normal} and adds it to the collection of created states.
+ * Should be used instead of instantiating with new operator.
*
- * @param loopingState the first state of zeroOrMore complex state
- * @param lastSink the state that the looping one points to
- * @return the newly created state
+ * @return the created state
*/
- @SuppressWarnings("unchecked")
- private State<T> createWaitingStateForZeroOrMore(final State<T> loopingState, final State<T> lastSink) {
- final IterativeCondition<T> currentFunction = (IterativeCondition<T>)currentPattern.getCondition();
+ private State<T> createNormalState() {
+ final State<T> state = new State<>(currentPattern.getName(), State.StateType.Normal);
+ states.add(state);
+ return state;
+ }
- final State<T> followByState = createNormalState();
- followByState.addProceed(lastSink, BooleanConditions.<T>trueFunction());
- followByState.addTake(loopingState, currentFunction);
+ private State<T> createStopState(final IterativeCondition<T> notCondition, final String name) {
+ // We should not duplicate the notStates. All states from which we can stop should point to the same one.
+ State<T> stopState = stopStates.get(name);
+ if (stopState == null) {
+ stopState = new State<>(name, State.StateType.Stop);
+ states.add(stopState);
+ stopState.addTake(notCondition);
+ stopStates.put(name, stopState);
+ }
+ return stopState;
+ }
- final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
- if (ignoreFunction != null) {
- final State<T> followByStateWithoutProceed = createNormalState();
- followByState.addIgnore(followByStateWithoutProceed, ignoreFunction);
- followByStateWithoutProceed.addIgnore(ignoreFunction);
- followByStateWithoutProceed.addTake(loopingState, currentFunction);
+ /**
+ * This method creates an alternative state that is target for TAKE transition from an optional State.
+ * Accepting an event in optional State discards all not Patterns that were present before it.
+ *
+ * <p>E.g for a Pattern begin("a").notFollowedBy("b").followedByAny("c").optional().followedByAny("d")
+ * a sequence like : {a c b d} is a valid match, but {a b d} is not.
+ *
+ * <p><b>NOTICE:</b> This method creates copy only if it necessary.
+ *
+ * @param sinkState a state to create copy without transitive nots
+ * @return the copy of the state itself if no modifications were needed
+ */
+ private State<T> copyWithoutTransitiveNots(final State<T> sinkState) {
+ final List<Tuple2<IterativeCondition<T>, String>> currentNotCondition = getCurrentNotCondition();
+
+ if (currentNotCondition.isEmpty() ||
+ !currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+ //we do not create an alternative path if we are NOT in an OPTIONAL state or there is no NOTs prior to
+ //the optional state
+ return sinkState;
}
- return followByState;
+ final State<T> copyOfSink = new State<>(sinkState.getName(), sinkState.getStateType());
+ states.add(copyOfSink);
+
+ for (StateTransition<T> tStateTransition : sinkState.getStateTransitions()) {
+
+ if (tStateTransition.getAction() == StateTransitionAction.PROCEED) {
+ State<T> targetState = tStateTransition.getTargetState();
+ boolean remove = false;
+ if (targetState.isStop()) {
+ for (Tuple2<IterativeCondition<T>, String> notCondition : currentNotCondition) {
+ if (targetState.getName().equals(notCondition.f1)) {
+ remove = true;
+ }
+ }
+ } else {
+ targetState = copyWithoutTransitiveNots(tStateTransition.getTargetState());
+ }
+
+ if (!remove) {
+ copyOfSink.addStateTransition(tStateTransition.getAction(), targetState, tStateTransition.getCondition());
+ }
+ } else {
+ copyOfSink.addStateTransition(
+ tStateTransition.getAction(),
+ tStateTransition.getTargetState().equals(tStateTransition.getSourceState())
+ ? copyOfSink
+ : tStateTransition.getTargetState(),
+ tStateTransition.getCondition()
+ );
+ }
+
+ }
+ return copyOfSink;
}
- private void checkPatternNameUniqueness() {
- if (usedNames.contains(currentPattern.getName())) {
- throw new MalformedPatternException(
- "Duplicate pattern name: " + currentPattern.getName() + ". " +
- "Pattern names must be unique.");
+ private void addStopStates(final State<T> state) {
+ for (Tuple2<IterativeCondition<T>, String> notCondition: getCurrentNotCondition()) {
+ final State<T> stopState = createStopState(notCondition.f0, notCondition.f1);
+ state.addProceed(stopState, notCondition.f0);
}
}
- /**
- * Creates the Start {@link State} of the resulting NFA graph.
- *
- * @param sinkState the state that Start state should point to (always first state of middle states)
- * @return created state
- */
- @SuppressWarnings("unchecked")
- private State<T> createStartState(State<T> sinkState) {
- final State<T> beginningState = convertPattern(sinkState);
- beginningState.makeStart();
+ private void addStopStateToLooping(final State<T> loopingState) {
+ if (followingPattern != null &&
+ followingPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
+ final IterativeCondition<T> notCondition = (IterativeCondition<T>) followingPattern.getCondition();
+ final State<T> stopState = createStopState(notCondition, followingPattern.getName());
+ loopingState.addProceed(stopState, notCondition);
+ }
+ }
- return beginningState;
+ private void checkPatternNameUniqueness(String patternName) {
+ if (usedNames.contains(currentPattern.getName())) {
+ throw new MalformedPatternException(
+ "Duplicate pattern name: " + patternName + ". " +
+ "Pattern names must be unique.");
+ }
+ usedNames.add(patternName);
}
/**
@@ -250,12 +382,13 @@ public class NFACompiler {
* @return the first state of the "complex" state, next state should point to it
*/
private State<T> createTimesState(final State<T> sinkState, int times) {
- State<T> lastSink = sinkState;
+ State<T> lastSink = copyWithoutTransitiveNots(sinkState);
for (int i = 0; i < times - 1; i++) {
lastSink = createSingletonState(lastSink, getInnerIgnoreCondition(currentPattern), false);
+ addStopStateToLooping(lastSink);
}
- final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
+ final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
// we created the intermediate states in the loop, now we create the start of the loop.
@@ -264,14 +397,15 @@ public class NFACompiler {
}
final State<T> singletonState = createNormalState();
- singletonState.addTake(lastSink, currentFilterFunction);
+ singletonState.addTake(lastSink, currentCondition);
singletonState.addProceed(sinkState, BooleanConditions.<T>trueFunction());
if (ignoreCondition != null) {
State<T> ignoreState = createNormalState();
- ignoreState.addTake(lastSink, currentFilterFunction);
+ ignoreState.addTake(lastSink, currentCondition);
ignoreState.addIgnore(ignoreCondition);
singletonState.addIgnore(ignoreState, ignoreCondition);
+ addStopStates(ignoreState);
}
return singletonState;
}
@@ -303,13 +437,16 @@ public class NFACompiler {
*/
@SuppressWarnings("unchecked")
private State<T> createSingletonState(final State<T> sinkState, final IterativeCondition<T> ignoreCondition, final boolean isOptional) {
- final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
+ final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
final State<T> singletonState = createNormalState();
- singletonState.addTake(sinkState, currentFilterFunction);
+ // if event is accepted then all notPatterns previous to the optional states are no longer valid
+ final State<T> sink = copyWithoutTransitiveNots(sinkState);
+ singletonState.addTake(sink, currentCondition);
if (isOptional) {
+ // if no element accepted the previous nots are still valid.
singletonState.addProceed(sinkState, trueFunction);
}
@@ -317,7 +454,9 @@ public class NFACompiler {
final State<T> ignoreState;
if (isOptional) {
ignoreState = createNormalState();
- ignoreState.addTake(sinkState, currentFilterFunction);
+ ignoreState.addTake(sink, currentCondition);
+ ignoreState.addIgnore(ignoreCondition);
+ addStopStates(ignoreState);
} else {
ignoreState = singletonState;
}
@@ -327,27 +466,6 @@ public class NFACompiler {
}
/**
- * Patterns with quantifiers AT_LEAST_ONE_* are created as a pair of states: a singleton state and
- * looping state. This method creates the first of the two.
- *
- * @param sinkState the state the newly created state should point to, it should be a looping state
- * @return the newly created state
- */
- @SuppressWarnings("unchecked")
- private State<T> createFirstMandatoryStateOfLoop(final State<T> sinkState) {
-
- final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
- final State<T> firstState = createNormalState();
-
- firstState.addTake(sinkState, currentFilterFunction);
- final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
- if (ignoreCondition != null) {
- firstState.addIgnore(ignoreCondition);
- }
- return firstState;
- }
-
- /**
* Creates the given state as a looping one. Looping state is one with TAKE edge to itself and
* PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
* for each PROCEED transition branches in computation state graph can be created only once.
@@ -357,34 +475,73 @@ public class NFACompiler {
*/
@SuppressWarnings("unchecked")
private State<T> createLooping(final State<T> sinkState) {
- final IterativeCondition<T> filterFunction = (IterativeCondition<T>) currentPattern.getCondition();
+ final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
final IterativeCondition<T> ignoreCondition = getInnerIgnoreCondition(currentPattern);
final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
final State<T> loopingState = createNormalState();
loopingState.addProceed(sinkState, trueFunction);
- loopingState.addTake(filterFunction);
+ loopingState.addTake(currentCondition);
+
+ addStopStateToLooping(loopingState);
if (ignoreCondition != null) {
final State<T> ignoreState = createNormalState();
- ignoreState.addTake(loopingState, filterFunction);
+ ignoreState.addTake(loopingState, currentCondition);
ignoreState.addIgnore(ignoreCondition);
loopingState.addIgnore(ignoreState, ignoreCondition);
- }
+ addStopStateToLooping(ignoreState);
+ }
return loopingState;
}
/**
- * Creates a state with {@link State.StateType#Normal} and adds it to the collection of created states.
- * Should be used instead of instantiating with new operator.
+ * Patterns with quantifiers AT_LEAST_ONE_* are created as a pair of states: a singleton state and
+ * looping state. This method creates the first of the two.
*
- * @return the created state
+ * @param sinkState the state the newly created state should point to, it should be a looping state
+ * @return the newly created state
*/
- private State<T> createNormalState() {
- final State<T> state = new State<>(currentPattern.getName(), State.StateType.Normal);
- states.add(state);
- return state;
+ @SuppressWarnings("unchecked")
+ private State<T> createInitMandatoryStateOfOneOrMore(final State<T> sinkState) {
+ final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
+
+ final State<T> firstState = createNormalState();
+ firstState.addTake(sinkState, currentCondition);
+
+ final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
+ if (ignoreCondition != null) {
+ firstState.addIgnore(ignoreCondition);
+ }
+ return firstState;
+ }
+
+ /**
+ * Creates a pair of states that enables relaxed strictness before a zeroOrMore looping state.
+ *
+ * @param loopingState the first state of zeroOrMore complex state
+ * @param lastSink the state that the looping one points to
+ * @return the newly created state
+ */
+ @SuppressWarnings("unchecked")
+ private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState, final State<T> lastSink) {
+ final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
+
+ final State<T> firstState = createNormalState();
+ firstState.addProceed(lastSink, BooleanConditions.<T>trueFunction());
+ firstState.addTake(loopingState, currentCondition);
+
+ final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
+ if (ignoreFunction != null) {
+ final State<T> firstStateWithoutProceed = createNormalState();
+ firstState.addIgnore(firstStateWithoutProceed, ignoreFunction);
+ firstStateWithoutProceed.addIgnore(ignoreFunction);
+ firstStateWithoutProceed.addTake(loopingState, currentCondition);
+
+ addStopStates(firstStateWithoutProceed);
+ }
+ return firstState;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index cd62c0d..b6374cd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -218,7 +218,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
}
}
- long count = 0;
@Override
public void onEventTime(InternalTimer<KEY, VoidNamespace> timer) throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index b100bc5..3cf25ef 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -210,7 +210,18 @@ public class Pattern<T, F extends T> {
* @return A new pattern which is appended to this one
*/
public Pattern<T, T> next(final String name) {
- return new Pattern<T, T>(name, this, ConsumingStrategy.STRICT);
+ return new Pattern<>(name, this, ConsumingStrategy.STRICT);
+ }
+
+ /**
+ * Appends a new pattern to the existing one. The new pattern enforces that there is no event matching this pattern
+ * right after the preceding matched event.
+ *
+ * @param name Name of the new pattern
+ * @return A new pattern which is appended to this one
+ */
+ public Pattern<T, T> notNext(final String name) {
+ return new Pattern<>(name, this, ConsumingStrategy.NOT_NEXT);
}
/**
@@ -226,6 +237,26 @@ public class Pattern<T, F extends T> {
}
/**
+ * Appends a new pattern to the existing one. The new pattern enforces that there is no event matching this pattern
+ * between the preceding pattern and succeeding this one.
+ *
+ * <p><b>NOTE:</b> There has to be other pattern after this one.
+ *
+ * @param name Name of the new pattern
+ * @return A new pattern which is appended to this one
+ */
+ public Pattern<T, T> notFollowedBy(final String name) {
+ if (quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+ throw new MalformedPatternException(
+ "Specifying a pattern with an optional path to NOT condition is not supported yet. " +
+ "You can simulate such pattern with two independent patterns, one with and the other without " +
+ "the optional part.");
+ }
+
+ return new Pattern<>(name, this, ConsumingStrategy.NOT_FOLLOW);
+ }
+
+ /**
* Appends a new pattern to the existing one. The new pattern enforces non-strict
* temporal contiguity. This means that a matching event of this pattern and the
* preceding matching event might be interleaved with other events which are ignored.
@@ -263,6 +294,7 @@ public class Pattern<T, F extends T> {
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
public Pattern<T, F> oneOrMore() {
+ checkIfNoNotPattern();
checkIfQuantifierApplied();
this.quantifier = Quantifier.ONE_OR_MORE(quantifier.getConsumingStrategy());
return this;
@@ -277,6 +309,7 @@ public class Pattern<T, F extends T> {
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
public Pattern<T, F> times(int times) {
+ checkIfNoNotPattern();
checkIfQuantifierApplied();
Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
this.quantifier = Quantifier.TIMES(quantifier.getConsumingStrategy());
@@ -341,6 +374,13 @@ public class Pattern<T, F extends T> {
return this;
}
+ private void checkIfNoNotPattern() {
+ if (quantifier.getConsumingStrategy() == ConsumingStrategy.NOT_FOLLOW ||
+ quantifier.getConsumingStrategy() == ConsumingStrategy.NOT_NEXT) {
+ throw new MalformedPatternException("Option not applicable to NOT pattern");
+ }
+ }
+
private void checkIfQuantifierApplied() {
if (!quantifier.hasProperty(Quantifier.QuantifierProperty.SINGLE)) {
throw new MalformedPatternException("Already applied quantifier to this Pattern. " +
http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index b0f882c..382c3ba 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -83,9 +83,10 @@ public class Quantifier {
}
public void optional() {
- if (hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
- throw new MalformedPatternException("Optional already applied!");
- }
+ checkPattern(!hasProperty(QuantifierProperty.OPTIONAL), "Optional already applied!");
+ checkPattern(!(consumingStrategy == ConsumingStrategy.NOT_NEXT ||
+ consumingStrategy == ConsumingStrategy.NOT_FOLLOW), "NOT pattern cannot be optional");
+
properties.add(Quantifier.QuantifierProperty.OPTIONAL);
}
@@ -120,7 +121,10 @@ public class Quantifier {
public enum ConsumingStrategy {
STRICT,
SKIP_TILL_NEXT,
- SKIP_TILL_ANY
+ SKIP_TILL_ANY,
+
+ NOT_FOLLOW,
+ NOT_NEXT
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index fe31564..ab6ff82 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -393,8 +393,7 @@ public class NFAITCase extends TestLogger {
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), true);
for (StreamRecord<Event> event: events) {
- Tuple2<Collection<Map<String, Event>>, Collection<Tuple2<Map<String, Event>, Long>>> patterns =
- nfa.process(event.getValue(), event.getTimestamp());
+ final Tuple2<Collection<Map<String, Event>>, Collection<Tuple2<Map<String, Event>, Long>>> patterns = nfa.process(event.getValue(), event.getTimestamp());
Collection<Map<String, Event>> matchedPatterns = patterns.f0;
Collection<Tuple2<Map<String, Event>, Long>> timeoutPatterns = patterns.f1;
@@ -786,14 +785,16 @@ public class NFAITCase extends TestLogger {
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
- }).oneOrMore().allowCombinations().optional().followedByAny("middle-second").where(new SimpleCondition<Event>() {
+ }).oneOrMore().allowCombinations().optional()
+ .followedBy("middle-second").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("d");
}
- }).oneOrMore().allowCombinations().optional().followedBy("end").where(new SimpleCondition<Event>() {
+ }).oneOrMore().allowCombinations().optional()
+ .followedBy("end").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 5726188262756267490L;
@Override
@@ -818,13 +819,11 @@ public class NFAITCase extends TestLogger {
}
}
- assertEquals(8, allPatterns.size());
+ assertEquals(6, allPatterns.size());
assertEquals(Sets.newHashSet(
Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end),
- Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end),
Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end),
Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end),
- Sets.newHashSet(startEvent, middleEvent3, end),
Sets.newHashSet(startEvent, middleEvent2, end),
Sets.newHashSet(startEvent, middleEvent1, end),
Sets.newHashSet(startEvent, end)
@@ -3147,7 +3146,1048 @@ public class NFAITCase extends TestLogger {
}
- ///////////////////////////////////////// Utility ////////////////////////////////
+ ///////////////////////////////////////// Not pattern /////////////////////////////////////////////////
+
+ @Test
+ public void testNotNext() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event b1 = new Event(42, "b", 3.0);
+ Event c2 = new Event(43, "c", 4.0);
+ Event d = new Event(43, "d", 4.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(b1, 3));
+ inputEvents.add(new StreamRecord<>(c2, 4));
+ inputEvents.add(new StreamRecord<>(d, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notNext("notPattern").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a1, c1, d),
+ Lists.newArrayList(a1, c2, d)
+ ));
+ }
+
+ @Test
+ public void testNotNextNoMatches() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event b1 = new Event(42, "b", 3.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event c2 = new Event(43, "c", 4.0);
+ Event d = new Event(43, "d", 4.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(b1, 2));
+ inputEvents.add(new StreamRecord<>(c1, 3));
+ inputEvents.add(new StreamRecord<>(c2, 4));
+ inputEvents.add(new StreamRecord<>(d, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notNext("notPattern").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedBy("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ assertEquals(0, matches.size());
+ }
+
+ @Test
+ public void testNotNextNoMatchesAtTheEnd() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event c2 = new Event(43, "c", 4.0);
+ Event d = new Event(43, "d", 4.0);
+ Event b1 = new Event(42, "b", 3.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(c2, 3));
+ inputEvents.add(new StreamRecord<>(d, 4));
+ inputEvents.add(new StreamRecord<>(b1, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ }).notNext("notPattern").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ assertEquals(0, matches.size());
+ }
+
+ @Test
+ public void testNotFollowedBy() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event b1 = new Event(42, "b", 3.0);
+ Event c2 = new Event(43, "c", 4.0);
+ Event d = new Event(43, "d", 4.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(b1, 3));
+ inputEvents.add(new StreamRecord<>(c2, 4));
+ inputEvents.add(new StreamRecord<>(d, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches,Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a1, c1, d)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByBeforeOptional() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event b1 = new Event(42, "b", 3.0);
+ Event c2 = new Event(43, "c", 4.0);
+ Event d = new Event(43, "d", 4.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(b1, 3));
+ inputEvents.add(new StreamRecord<>(c2, 4));
+ inputEvents.add(new StreamRecord<>(d, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).optional().followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches,Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a1, c1, d)
+ ));
+ }
+
+ @Test
+ public void testTimesWithNotFollowedBy() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event b1 = new Event(41, "b", 2.0);
+ Event c = new Event(42, "c", 3.0);
+ Event b2 = new Event(43, "b", 4.0);
+ Event d = new Event(43, "d", 4.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(b1, 2));
+ inputEvents.add(new StreamRecord<>(c, 3));
+ inputEvents.add(new StreamRecord<>(b2, 4));
+ inputEvents.add(new StreamRecord<>(d, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches,Lists.<List<Event>>newArrayList());
+ }
+
+ @Test
+ public void testIgnoreStateOfTimesWithNotFollowedBy() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event e = new Event(41, "e", 2.0);
+ Event c1 = new Event(42, "c", 3.0);
+ Event b1 = new Event(43, "b", 4.0);
+ Event c2 = new Event(44, "c", 5.0);
+ Event d1 = new Event(45, "d", 6.0);
+ Event d2 = new Event(46, "d", 7.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(d1, 2));
+ inputEvents.add(new StreamRecord<>(e, 1));
+ inputEvents.add(new StreamRecord<>(b1, 3));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(c2, 4));
+ inputEvents.add(new StreamRecord<>(d2, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a1, d1)
+ ));
+ }
+
+ @Test
+ public void testTimesWithNotFollowedByAfter() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event e = new Event(41, "e", 2.0);
+ Event c1 = new Event(42, "c", 3.0);
+ Event b1 = new Event(43, "b", 4.0);
+ Event b2 = new Event(44, "b", 5.0);
+ Event d1 = new Event(46, "d", 7.0);
+ Event d2 = new Event(47, "d", 8.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(d1, 2));
+ inputEvents.add(new StreamRecord<>(e, 1));
+ inputEvents.add(new StreamRecord<>(b1, 3));
+ inputEvents.add(new StreamRecord<>(b2, 3));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(d2, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList());
+ }
+
+ @Test
+ public void testNotFollowedByBeforeOptionalAtTheEnd() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event b1 = new Event(42, "b", 3.0);
+ Event c2 = new Event(43, "c", 4.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(b1, 3));
+ inputEvents.add(new StreamRecord<>(c2, 4));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedByAny("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).optional();
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches,Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a1, c1),
+ Lists.newArrayList(a1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByBeforeOptionalTimes() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event b1 = new Event(42, "b", 3.0);
+ Event c2 = new Event(43, "c", 4.0);
+ Event d = new Event(43, "d", 4.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(b1, 3));
+ inputEvents.add(new StreamRecord<>(c2, 4));
+ inputEvents.add(new StreamRecord<>(d, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches,Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a1, c1, c2, d)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByWithBranchingAtStart() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event b1 = new Event(42, "b", 3.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event a2 = new Event(41, "a", 4.0);
+ Event c2 = new Event(43, "c", 5.0);
+ Event d = new Event(43, "d", 6.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(b1, 2));
+ inputEvents.add(new StreamRecord<>(c1, 3));
+ inputEvents.add(new StreamRecord<>(a2, 4));
+ inputEvents.add(new StreamRecord<>(c2, 5));
+ inputEvents.add(new StreamRecord<>(d, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedBy("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a2, c2, d)
+ ));
+ }
+
+ private static class NotFollowByData {
+ static final Event a1 = new Event(40, "a", 1.0);
+ static final Event b1 = new Event(41, "b", 2.0);
+ static final Event b2 = new Event(42, "b", 3.0);
+ static final Event b3 = new Event(42, "b", 4.0);
+ static final Event c1 = new Event(43, "c", 5.0);
+ static final Event b4 = new Event(42, "b", 6.0);
+ static final Event b5 = new Event(42, "b", 7.0);
+ static final Event b6 = new Event(42, "b", 8.0);
+ static final Event d1 = new Event(43, "d", 9.0);
+
+ private NotFollowByData() {
+ }
+ }
+
+ @Test
+ public void testNotNextAfterZeroOrMoreSkipTillNext() {
+ final List<List<Event>> matches = testNotNextAfterZeroOrMore(false);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.d1)
+ ));
+ }
+
+ @Test
+ public void testNotNextAfterZeroOrMoreSkipTillAny() {
+ final List<List<Event>> matches = testNotNextAfterZeroOrMore(true);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b2, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.d1)
+ ));
+ }
+
+ private List<List<Event>> testNotNextAfterZeroOrMore(boolean allMatches) {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ int i = 0;
+ inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b2, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i++));
+
+ Pattern<Event, ?> pattern = Pattern
+ .<Event>begin("a").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ });
+
+ pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*")).where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).oneOrMore().optional()
+ .notNext("not c").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ })
+ .followedBy("d").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ return feedNFA(inputEvents, nfa);
+ }
+
+ @Test
+ public void testNotNextAfterOneOrMoreSkipTillNext() {
+ final List<List<Event>> matches = testNotNextAfterOneOrMore(false);
+ assertEquals(0, matches.size());
+ }
+
+ @Test
+ public void testNotNextAfterOneOrMoreSkipTillAny() {
+ final List<List<Event>> matches = testNotNextAfterOneOrMore(true);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b2, NotFollowByData.d1)
+ ));
+ }
+
+ private List<List<Event>> testNotNextAfterOneOrMore(boolean allMatches) {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ int i = 0;
+ inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b2, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i++));
+
+ Pattern<Event, ?> pattern = Pattern
+ .<Event>begin("a").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ });
+
+ pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*")).where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).oneOrMore()
+ .notNext("not c").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ })
+ .followedBy("d").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ return feedNFA(inputEvents, nfa);
+ }
+
+ @Test
+ public void testNotFollowedByNextAfterOneOrMoreEager() {
+ final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(true, false);
+ assertEquals(0, matches.size());
+ }
+
+ @Test
+ public void testNotFollowedByAnyAfterOneOrMoreEager() {
+ final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(true, true);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b6, NotFollowByData.d1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByNextAfterOneOrMoreCombinations() {
+ final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(false, false);
+ assertEquals(0, matches.size());
+ }
+
+ @Test
+ public void testNotFollowedByAnyAfterOneOrMoreCombinations() {
+ final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(false, true);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b6, NotFollowByData.d1)
+ ));
+ }
+
+ private List<List<Event>> testNotFollowedByAfterOneOrMore(boolean eager, boolean allMatches) {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ int i = 0;
+ inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b2, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b3, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b4, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b5, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b6, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i));
+
+ Pattern<Event, ?> pattern = Pattern
+ .<Event>begin("a").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ });
+
+ pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+ .where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ pattern = (eager ? pattern.oneOrMore() : pattern.oneOrMore().allowCombinations())
+ .notFollowedBy("not c").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ })
+ .followedBy("d").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ return feedNFA(inputEvents, nfa);
+ }
+
+ @Test
+ public void testNotFollowedByAnyBeforeOneOrMoreEager() {
+ final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(true, true);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByAnyBeforeOneOrMoreCombinations() {
+ final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(false, true);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByBeforeOneOrMoreEager() {
+ final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(true, false);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByBeforeOneOrMoreCombinations() {
+ final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(false, false);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
+ ));
+ }
+
+ private List<List<Event>> testNotFollowedByBeforeOneOrMore(boolean eager, boolean allMatches) {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ int i = 0;
+ inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b4, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b5, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b6, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i));
+
+ Pattern<Event, ?> pattern = Pattern
+ .<Event>begin("a").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ })
+ .notFollowedBy("not c").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ });
+
+ pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+ .where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).oneOrMore();
+
+ pattern = (eager ? pattern : pattern.allowCombinations())
+ .followedBy("d").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ return feedNFA(inputEvents, nfa);
+ }
+
+ @Test
+ public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillNext() {
+ final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(true, false);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillNext() {
+ final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(false, false);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillAny() {
+ final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(true, true);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillAny() {
+ final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(false, true);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
+ Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1)
+ ));
+ }
+
+ private List<List<Event>> testNotFollowedByBeforeZeroOrMore(boolean eager, boolean allMatches) {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ int i = 0;
+ inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b4, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b5, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.b6, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i));
+
+ Pattern<Event, ?> pattern = Pattern
+ .<Event>begin("a").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ })
+ .notFollowedBy("not c").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ });
+
+ pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+ .where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).oneOrMore().optional();
+
+ pattern = (eager ? pattern : pattern.allowCombinations())
+ .followedBy("d").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ return feedNFA(inputEvents, nfa);
+ }
+
+ ///////////////////////////////////////// Utility /////////////////////////////////////////////////
+
private List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa) {
List<List<Event>> resultingPatterns = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/5795ebe1/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index ced9efe..90a6321 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -87,6 +87,21 @@ public class NFACompilerTest extends TestLogger {
NFACompiler.compile(invalidPattern, Event.createTypeSerializer(), false);
}
+ @Test
+ public void testNFACompilerPatternEndsWithNotFollowedBy() {
+
+ // adjust the rule
+ expectedException.expect(MalformedPatternException.class);
+ expectedException.expectMessage("NotFollowedBy is not supported as a last part of a Pattern!");
+
+ Pattern<Event, ?> invalidPattern = Pattern.<Event>begin("start").where(new TestFilter())
+ .followedBy("middle").where(new TestFilter())
+ .notFollowedBy("end").where(new TestFilter());
+
+ // here we must have an exception because of the two "start" patterns with the same name.
+ NFACompiler.compile(invalidPattern, Event.createTypeSerializer(), false);
+ }
+
/**
* A filter implementation to test invalid pattern specification with
* duplicate pattern names. Check {@link #testNFACompilerUniquePatternName()}.
@@ -149,6 +164,26 @@ public class NFACompilerTest extends TestLogger {
assertEquals(0, endingState.getStateTransitions().size());
}
+ @Test
+ public void testNoUnnecessaryStateCopiesCreated() {
+ final Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(startFilter)
+ .notFollowedBy("not").where(startFilter)
+ .followedBy("oneOrMore").where(startFilter).oneOrMore()
+ .followedBy("end").where(endFilter);
+
+ final NFACompiler.NFAFactoryCompiler<Event> nfaFactoryCompiler = new NFACompiler.NFAFactoryCompiler<>(pattern);
+ nfaFactoryCompiler.compileFactory();
+
+ int endStateCount = 0;
+ for (State<Event> state : nfaFactoryCompiler.getStates()) {
+ if (state.getName().equals("end")) {
+ endStateCount++;
+ }
+ }
+
+ assertEquals(1, endStateCount);
+ }
+
private <T> Set<Tuple2<String, StateTransitionAction>> unfoldTransitions(final State<T> state) {
final Set<Tuple2<String, StateTransitionAction>> transitions = new HashSet<>();
for (StateTransition<T> transition : state.getStateTransitions()) {