You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/03/23 12:47:24 UTC
[1/3] flink git commit: [FLINK-3318] Backward compatibility of CEP NFA
Repository: flink
Updated Branches:
refs/heads/master d0695c054 -> d20fb090c
[FLINK-3318] Backward compatibility of CEP NFA
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d20fb090
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d20fb090
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d20fb090
Branch: refs/heads/master
Commit: d20fb090c31858bc0372a8c84228d796558d56b0
Parents: 9001c4e
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Tue Mar 21 10:53:07 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Thu Mar 23 10:47:55 2017 +0100
----------------------------------------------------------------------
.../main/java/org/apache/flink/cep/nfa/NFA.java | 162 ++++++-
.../org/apache/flink/cep/nfa/SharedBuffer.java | 49 ++
.../java/org/apache/flink/cep/nfa/State.java | 19 +-
.../flink/cep/nfa/compiler/NFACompiler.java | 116 +++++
.../org/apache/flink/cep/nfa/NFAITCase.java | 194 +++++++-
.../cep/operator/CEPMigration12to13Test.java | 477 +++++++++++++++++++
.../test/resources/cep-branching-snapshot-1.2 | Bin 0 -> 6736 bytes
.../resources/cep-single-pattern-snapshot-1.2 | Bin 0 -> 3311 bytes
.../test/resources/cep-starting-snapshot-1.2 | Bin 0 -> 6526 bytes
9 files changed, 986 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 3d42248..ab03566 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -18,6 +18,8 @@
package org.apache.flink.cep.nfa;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
import com.google.common.collect.LinkedHashMultimap;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -25,18 +27,22 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.cep.NonDuplicatingTypeSerializer;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.windowing.time.Time;
+import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.io.OptionalDataException;
import java.io.Serializable;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -85,9 +91,9 @@ public class NFA<T> implements Serializable {
private final NonDuplicatingTypeSerializer<T> nonDuplicatingTypeSerializer;
/**
- * Buffer used to store the matched events.
+ * Used only for backward compatibility. Buffer used to store the matched events.
*/
- private final SharedBuffer<String, T> sharedBuffer;
+ private final SharedBuffer<State<T>, T> sharedBuffer = null;
/**
* A set of all the valid NFA states, as returned by the
@@ -110,12 +116,22 @@ public class NFA<T> implements Serializable {
private final boolean handleTimeout;
/**
+ * Used only for backward compatibility.
+ */
+ private int startEventCounter;
+
+ /**
* Current set of {@link ComputationState computation states} within the state machine.
* These are the "active" intermediate states that are waiting for new matching
* events to transition to new valid states.
*/
private transient Queue<ComputationState<T>> computationStates;
+ /**
+ * Buffer used to store the matched events.
+ */
+ private final SharedBuffer<String, T> stringSharedBuffer;
+
public NFA(
final TypeSerializer<T> eventSerializer,
final long windowTime,
@@ -124,7 +140,7 @@ public class NFA<T> implements Serializable {
this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer);
this.windowTime = windowTime;
this.handleTimeout = handleTimeout;
- sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
+ stringSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
computationStates = new LinkedList<>();
states = new HashSet<>();
@@ -156,7 +172,7 @@ public class NFA<T> implements Serializable {
* {@code false} otherwise.
*/
public boolean isEmpty() {
- return sharedBuffer.isEmpty();
+ return stringSharedBuffer.isEmpty();
}
/**
@@ -194,9 +210,14 @@ public class NFA<T> implements Serializable {
}
}
- // remove computation state which has exceeded the window length
- sharedBuffer.release(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
- sharedBuffer.remove(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
+ stringSharedBuffer.release(
+ computationState.getPreviousState().getName(),
+ computationState.getEvent(),
+ computationState.getTimestamp());
+ stringSharedBuffer.remove(
+ computationState.getPreviousState().getName(),
+ computationState.getEvent(),
+ computationState.getTimestamp());
newComputationStates = Collections.emptyList();
} else if (event != null) {
@@ -212,8 +233,8 @@ public class NFA<T> implements Serializable {
result.addAll(matches);
// remove found patterns because they are no longer needed
- sharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
- sharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
+ stringSharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
+ stringSharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
} else {
// add new computation state; it will be processed once the next event arrives
computationStates.add(newComputationState);
@@ -230,7 +251,7 @@ public class NFA<T> implements Serializable {
// remove all elements which are expired
// with respect to the window length
- sharedBuffer.prune(pruningTimestamp);
+ stringSharedBuffer.prune(pruningTimestamp);
}
}
@@ -244,7 +265,7 @@ public class NFA<T> implements Serializable {
NFA<T> other = (NFA<T>) obj;
return nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
- sharedBuffer.equals(other.sharedBuffer) &&
+ stringSharedBuffer.equals(other.stringSharedBuffer) &&
states.equals(other.states) &&
windowTime == other.windowTime;
} else {
@@ -254,7 +275,7 @@ public class NFA<T> implements Serializable {
@Override
public int hashCode() {
- return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime);
+ return Objects.hash(nonDuplicatingTypeSerializer, stringSharedBuffer, states, windowTime);
}
private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) {
@@ -376,7 +397,7 @@ public class NFA<T> implements Serializable {
computationState.getStartTimestamp()
)
);
- sharedBuffer.lock(
+ stringSharedBuffer.lock(
computationState.getPreviousState().getName(),
computationState.getEvent(),
computationState.getTimestamp());
@@ -397,14 +418,14 @@ public class NFA<T> implements Serializable {
final long startTimestamp;
if (computationState.isStartState()) {
startTimestamp = timestamp;
- sharedBuffer.put(
+ stringSharedBuffer.put(
consumingState.getName(),
event,
timestamp,
currentVersion);
} else {
startTimestamp = computationState.getStartTimestamp();
- sharedBuffer.put(
+ stringSharedBuffer.put(
consumingState.getName(),
event,
timestamp,
@@ -415,7 +436,7 @@ public class NFA<T> implements Serializable {
}
// a new computation state is referring to the shared entry
- sharedBuffer.lock(consumingState.getName(), event, timestamp);
+ stringSharedBuffer.lock(consumingState.getName(), event, timestamp);
resultingComputationStates.add(ComputationState.createState(
newState,
@@ -429,7 +450,7 @@ public class NFA<T> implements Serializable {
//check if newly created state is optional (have a PROCEED path to Final state)
final State<T> finalState = findFinalStateAfterProceed(newState, event);
if (finalState != null) {
- sharedBuffer.lock(consumingState.getName(), event, timestamp);
+ stringSharedBuffer.lock(consumingState.getName(), event, timestamp);
resultingComputationStates.add(ComputationState.createState(
finalState,
consumingState,
@@ -450,12 +471,12 @@ public class NFA<T> implements Serializable {
if (computationState.getEvent() != null) {
// release the shared entry referenced by the current computation state.
- sharedBuffer.release(
+ stringSharedBuffer.release(
computationState.getPreviousState().getName(),
computationState.getEvent(),
computationState.getTimestamp());
// try to remove unnecessary shared buffer entries
- sharedBuffer.remove(
+ stringSharedBuffer.remove(
computationState.getPreviousState().getName(),
computationState.getEvent(),
computationState.getTimestamp());
@@ -546,7 +567,7 @@ public class NFA<T> implements Serializable {
* @return Collection of event sequences which end in the given computation state
*/
private Collection<Map<String, T>> extractPatternMatches(final ComputationState<T> computationState) {
- Collection<LinkedHashMultimap<String, T>> paths = sharedBuffer.extractPatterns(
+ Collection<LinkedHashMultimap<String, T>> paths = stringSharedBuffer.extractPatterns(
computationState.getPreviousState().getName(),
computationState.getEvent(),
computationState.getTimestamp(),
@@ -592,6 +613,8 @@ public class NFA<T> implements Serializable {
nonDuplicatingTypeSerializer.clearReferences();
}
+ private final static String BEGINNING_STATE_NAME = "$beginningState$";
+
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
ois.defaultReadObject();
@@ -599,15 +622,103 @@ public class NFA<T> implements Serializable {
computationStates = new LinkedList<>();
+ final List<ComputationState<T>> readComputationStates = new ArrayList<>(numberComputationStates);
+
+ boolean afterMigration = false;
for (int i = 0; i < numberComputationStates; i++) {
ComputationState<T> computationState = readComputationState(ois);
+ if (computationState.getState().getName().equals(BEGINNING_STATE_NAME)) {
+ afterMigration = true;
+ }
- computationStates.offer(computationState);
+ readComputationStates.add(computationState);
+ }
+
+ if (afterMigration && !readComputationStates.isEmpty()) {
+ try {
+ //Backwards compatibility
+ this.computationStates.addAll(migrateNFA(readComputationStates));
+ final Field newSharedBufferField = NFA.class.getDeclaredField("stringSharedBuffer");
+ final Field sharedBufferField = NFA.class.getDeclaredField("sharedBuffer");
+ sharedBufferField.setAccessible(true);
+ newSharedBufferField.setAccessible(true);
+ newSharedBufferField.set(this, SharedBuffer.migrateSharedBuffer(this.sharedBuffer));
+ sharedBufferField.set(this, null);
+ sharedBufferField.setAccessible(false);
+ newSharedBufferField.setAccessible(false);
+ } catch (Exception e) {
+ throw new IllegalStateException("Could not migrate from earlier version", e);
+ }
+ } else {
+ this.computationStates.addAll(readComputationStates);
}
nonDuplicatingTypeSerializer.clearReferences();
}
+ /**
+ * Needed for backward compatibility. First migrates the {@link State} graph see {@link NFACompiler#migrateGraph(State)}.
+ * Than recreates the {@link ComputationState}s with the new {@link State} graph.
+ * @param readStates computation states read from snapshot
+ * @return collection of migrated computation states
+ */
+ private Collection<ComputationState<T>> migrateNFA(Collection<ComputationState<T>> readStates) {
+ final ArrayList<ComputationState<T>> computationStates = new ArrayList<>();
+
+ final State<T> startState = Iterators.find(
+ readStates.iterator(),
+ new Predicate<ComputationState<T>>() {
+ @Override
+ public boolean apply(@Nullable ComputationState<T> input) {
+ return input != null && input.getState().getName().equals(BEGINNING_STATE_NAME);
+ }
+ }).getState();
+
+ final Map<String, State<T>> convertedStates = NFACompiler.migrateGraph(startState);
+
+ for (ComputationState<T> readState : readStates) {
+ if (!readState.isStartState()) {
+ final String previousName = readState.getState().getName();
+ final String currentName = Iterators.find(
+ readState.getState().getStateTransitions().iterator(),
+ new Predicate<StateTransition<T>>() {
+ @Override
+ public boolean apply(@Nullable StateTransition<T> input) {
+ return input != null && input.getAction() == StateTransitionAction.TAKE;
+ }
+ }).getTargetState().getName();
+
+
+ final State<T> previousState = convertedStates.get(previousName);
+
+ computationStates.add(ComputationState.createState(
+ convertedStates.get(currentName),
+ previousState,
+ readState.getEvent(),
+ readState.getTimestamp(),
+ readState.getVersion(),
+ readState.getStartTimestamp()
+ ));
+ }
+ }
+
+ final String startName = Iterators.find(convertedStates.values().iterator(), new Predicate<State<T>>() {
+ @Override
+ public boolean apply(@Nullable State<T> input) {
+ return input != null && input.isStart();
+ }
+ }).getName();
+
+ computationStates.add(ComputationState.createStartState(
+ convertedStates.get(startName),
+ new DeweyNumber(this.startEventCounter)));
+
+ this.states.clear();
+ this.states.addAll(convertedStates.values());
+
+ return computationStates;
+ }
+
private void writeComputationState(final ComputationState<T> computationState, final ObjectOutputStream oos) throws IOException {
oos.writeObject(computationState.getState());
oos.writeObject(computationState.getPreviousState());
@@ -629,7 +740,13 @@ public class NFA<T> implements Serializable {
@SuppressWarnings("unchecked")
private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException {
final State<T> state = (State<T>)ois.readObject();
- final State<T> previousState = (State<T>)ois.readObject();
+ State<T> previousState;
+ try {
+ previousState = (State<T>)ois.readObject();
+ } catch (OptionalDataException e) {
+ previousState = null;
+ }
+
final long timestamp = ois.readLong();
final DeweyNumber version = (DeweyNumber)ois.readObject();
final long startTimestamp = ois.readLong();
@@ -647,6 +764,7 @@ public class NFA<T> implements Serializable {
return ComputationState.createState(state, previousState, event, timestamp, version, startTimestamp);
}
+
/**
* Generates a state name from a given name template and an index.
* <p>
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index e6a8c75..d5b7876 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa;
import com.google.common.collect.LinkedHashMultimap;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -463,6 +464,54 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
}
}
+ private SharedBuffer(
+ TypeSerializer<V> valueSerializer,
+ Map<K, SharedBufferPage<K, V>> pages) {
+ this.valueSerializer = valueSerializer;
+ this.pages = pages;
+ }
+
+ /**
+ * For backward compatibility only. Previously the key in {@link SharedBuffer} was {@link State}.
+ * Now it is {@link String}.
+ */
+ @Internal
+ static <T> SharedBuffer<String, T> migrateSharedBuffer(SharedBuffer<State<T>, T> buffer) {
+
+ final Map<String, SharedBufferPage<String, T>> pageMap = new HashMap<>();
+ final Map<SharedBufferEntry<State<T>, T>, SharedBufferEntry<String, T>> entries = new HashMap<>();
+
+ for (Map.Entry<State<T>, SharedBufferPage<State<T>, T>> page : buffer.pages.entrySet()) {
+ final SharedBufferPage<String, T> newPage = new SharedBufferPage<>(page.getKey().getName());
+ pageMap.put(newPage.getKey(), newPage);
+
+ for (Map.Entry<ValueTimeWrapper<T>, SharedBufferEntry<State<T>, T>> pageEntry : page.getValue().entries.entrySet()) {
+ final SharedBufferEntry<String, T> newSharedBufferEntry = new SharedBufferEntry<>(
+ pageEntry.getKey(),
+ newPage);
+ newSharedBufferEntry.referenceCounter = pageEntry.getValue().referenceCounter;
+ entries.put(pageEntry.getValue(), newSharedBufferEntry);
+ newPage.entries.put(pageEntry.getKey(), newSharedBufferEntry);
+ }
+ }
+
+ for (Map.Entry<State<T>, SharedBufferPage<State<T>, T>> page : buffer.pages.entrySet()) {
+ for (Map.Entry<ValueTimeWrapper<T>, SharedBufferEntry<State<T>, T>> pageEntry : page.getValue().entries.entrySet()) {
+ final SharedBufferEntry<String, T> newEntry = entries.get(pageEntry.getValue());
+ for (SharedBufferEdge<State<T>, T> edge : pageEntry.getValue().edges) {
+ final SharedBufferEntry<String, T> targetNewEntry = entries.get(edge.getTarget());
+
+ final SharedBufferEdge<String, T> newEdge = new SharedBufferEdge<>(
+ targetNewEntry,
+ edge.getVersion());
+ newEntry.edges.add(newEdge);
+ }
+ }
+ }
+
+ return new SharedBuffer<>(buffer.valueSerializer, pageMap);
+ }
+
private SharedBufferEntry<K, V> get(
final K key,
final V value,
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 7bcb6ea..27e0dcd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -20,9 +20,12 @@ package org.apache.flink.cep.nfa;
import org.apache.flink.api.common.functions.FilterFunction;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Objects;
/**
@@ -62,7 +65,6 @@ public class State<T> implements Serializable {
return stateTransitions;
}
-
private void addStateTransition(
final StateTransitionAction action,
final State<T> targetState,
@@ -132,4 +134,19 @@ public class State<T> implements Serializable {
Final, // the state is a final state for the NFA
Normal // the state is neither a start nor a final state
}
+
+ private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
+ ois.defaultReadObject();
+
+ //Backward compatibility. Previous version of StateTransition did not have source state
+ if (!stateTransitions.isEmpty() && stateTransitions.iterator().next().getSourceState() == null) {
+ final List<StateTransition<T>> tmp = new ArrayList<>();
+ tmp.addAll(this.stateTransitions);
+
+ this.stateTransitions.clear();
+ for (StateTransition<T> transition : tmp) {
+ addStateTransition(transition.getAction(), transition.getTargetState(), transition.getCondition());
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index b476c49..8bd8612 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -18,10 +18,15 @@
package org.apache.flink.cep.nfa.compiler;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.State;
+import org.apache.flink.cep.nfa.StateTransition;
+import org.apache.flink.cep.nfa.StateTransitionAction;
import org.apache.flink.cep.pattern.FilterFunctions;
import org.apache.flink.cep.pattern.FollowedByPattern;
import org.apache.flink.cep.pattern.MalformedPatternException;
@@ -31,12 +36,15 @@ import org.apache.flink.cep.pattern.Quantifier;
import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
import org.apache.flink.streaming.api.windowing.time.Time;
+import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
/**
@@ -363,6 +371,114 @@ public class NFACompiler {
}
/**
+ * Used for migrating CEP graphs prior to 1.3. It removes the dummy start, adds the dummy end, and translates all
+ * states to consuming ones by moving all TAKEs and IGNOREs to the next state. This method assumes each state
+ * has at most one TAKE and one IGNORE and name of each state is unique. No PROCEED transition is allowed!
+ *
+ * @param oldStartState dummy start state of old graph
+ * @param <T> type of events
+ * @return map of new states, where key is the name of a state and value is the state itself
+ */
+ @Internal
+ public static <T> Map<String, State<T>> migrateGraph(State<T> oldStartState) {
+ State<T> oldFirst = oldStartState;
+ State<T> oldSecond = oldStartState.getStateTransitions().iterator().next().getTargetState();
+
+ StateTransition<T> oldFirstToSecondTake = Iterators.find(
+ oldFirst.getStateTransitions().iterator(),
+ new Predicate<StateTransition<T>>() {
+ @Override
+ public boolean apply(@Nullable StateTransition<T> input) {
+ return input != null && input.getAction() == StateTransitionAction.TAKE;
+ }
+
+ });
+
+ StateTransition<T> oldFirstIgnore = Iterators.find(
+ oldFirst.getStateTransitions().iterator(),
+ new Predicate<StateTransition<T>>() {
+ @Override
+ public boolean apply(@Nullable StateTransition<T> input) {
+ return input != null && input.getAction() == StateTransitionAction.IGNORE;
+ }
+
+ }, null);
+
+ StateTransition<T> oldSecondToThirdTake = Iterators.find(
+ oldSecond.getStateTransitions().iterator(),
+ new Predicate<StateTransition<T>>() {
+ @Override
+ public boolean apply(@Nullable StateTransition<T> input) {
+ return input != null && input.getAction() == StateTransitionAction.TAKE;
+ }
+
+ }, null);
+
+ final Map<String, State<T>> convertedStates = new HashMap<>();
+ State<T> newSecond;
+ State<T> newFirst = new State<>(oldSecond.getName(), State.StateType.Start);
+ convertedStates.put(newFirst.getName(), newFirst);
+ while (oldSecondToThirdTake != null) {
+
+ newSecond = new State<T>(oldSecondToThirdTake.getTargetState().getName(), State.StateType.Normal);
+ convertedStates.put(newSecond.getName(), newSecond);
+ newFirst.addTake(newSecond, oldFirstToSecondTake.getCondition());
+
+ if (oldFirstIgnore != null) {
+ newFirst.addIgnore(oldFirstIgnore.getCondition());
+ }
+
+ oldFirst = oldSecond;
+
+ oldFirstToSecondTake = Iterators.find(
+ oldFirst.getStateTransitions().iterator(),
+ new Predicate<StateTransition<T>>() {
+ @Override
+ public boolean apply(@Nullable StateTransition<T> input) {
+ return input != null && input.getAction() == StateTransitionAction.TAKE;
+ }
+
+ });
+
+ oldFirstIgnore = Iterators.find(
+ oldFirst.getStateTransitions().iterator(),
+ new Predicate<StateTransition<T>>() {
+ @Override
+ public boolean apply(@Nullable StateTransition<T> input) {
+ return input != null && input.getAction() == StateTransitionAction.IGNORE;
+ }
+
+ }, null);
+
+ oldSecond = oldSecondToThirdTake.getTargetState();
+
+ oldSecondToThirdTake = Iterators.find(
+ oldSecond.getStateTransitions().iterator(),
+ new Predicate<StateTransition<T>>() {
+ @Override
+ public boolean apply(@Nullable StateTransition<T> input) {
+ return input != null && input.getAction() == StateTransitionAction.TAKE;
+ }
+
+ }, null);
+
+ newFirst = newSecond;
+ }
+
+ final State<T> endingState = new State<>(ENDING_STATE_NAME, State.StateType.Final);
+
+ newFirst.addTake(endingState, oldFirstToSecondTake.getCondition());
+
+ if (oldFirstIgnore != null) {
+ newFirst.addIgnore(oldFirstIgnore.getCondition());
+ }
+
+ convertedStates.put(endingState.getName(), endingState);
+
+ return convertedStates;
+ }
+
+ /**
* Factory interface for {@link NFA}.
*
* @param <T> Type of the input events which are processed by the NFA
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 825ba957..5b05f19 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -39,6 +39,7 @@ import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class NFAITCase extends TestLogger {
@@ -421,7 +422,7 @@ public class NFAITCase extends TestLogger {
}
@Test
- public void testComplexBranchingAfterKleeneStar() {
+ public void testComplexBranchingAfterZeroOrMore() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -519,7 +520,7 @@ public class NFAITCase extends TestLogger {
}
@Test
- public void testKleeneStar() {
+ public void testZeroOrMore() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -581,7 +582,7 @@ public class NFAITCase extends TestLogger {
}
@Test
- public void testEagerKleeneStar() {
+ public void testEagerZeroOrMore() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -646,7 +647,7 @@ public class NFAITCase extends TestLogger {
@Test
- public void testBeginWithKleeneStar() {
+ public void testBeginWithZeroOrMore() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event middleEvent1 = new Event(40, "a", 2.0);
@@ -704,7 +705,7 @@ public class NFAITCase extends TestLogger {
}
@Test
- public void testKleeneStarAfterKleeneStar() {
+ public void testZeroOrMoreAfterZeroOrMore() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -779,7 +780,7 @@ public class NFAITCase extends TestLogger {
}
@Test
- public void testKleeneStarAfterBranching() {
+ public void testZeroOrMoreAfterBranching() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -865,7 +866,7 @@ public class NFAITCase extends TestLogger {
}
@Test
- public void testStrictContinuityNoResultsAfterKleeneStar() {
+ public void testStrictContinuityNoResultsAfterZeroOrMore() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event start = new Event(40, "d", 2.0);
@@ -923,7 +924,7 @@ public class NFAITCase extends TestLogger {
}
@Test
- public void testStrictContinuityResultsAfterKleeneStar() {
+ public void testStrictContinuityResultsAfterZeroOrMore() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event start = new Event(40, "d", 2.0);
@@ -1663,4 +1664,181 @@ public class NFAITCase extends TestLogger {
), resultingPatterns);
}
+ /**
+ * Clearing SharedBuffer
+ */
+
+ @Test
+ public void testTimesClearingBuffer() {
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).next("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(2).followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).within(Time.milliseconds(8));
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ nfa.process(startEvent, 1);
+ nfa.process(middleEvent1, 2);
+ nfa.process(middleEvent2, 3);
+ nfa.process(middleEvent3, 4);
+ nfa.process(end1, 6);
+
+ //pruning element
+ nfa.process(null, 10);
+
+ assertEquals(true, nfa.isEmpty());
+ }
+
+ @Test
+ public void testOptionalClearingBuffer() {
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).optional().followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).within(Time.milliseconds(8));
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ nfa.process(startEvent, 1);
+ nfa.process(middleEvent, 5);
+ nfa.process(end1, 6);
+
+ //pruning element
+ nfa.process(null, 10);
+
+ assertEquals(true, nfa.isEmpty());
+ }
+
+ @Test
+ public void testAtLeastOneClearingBuffer() {
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).oneOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).within(Time.milliseconds(8));
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ nfa.process(startEvent, 1);
+ nfa.process(middleEvent1, 3);
+ nfa.process(middleEvent2, 4);
+ nfa.process(end1, 6);
+
+ //pruning element
+ nfa.process(null, 10);
+
+ assertEquals(true, nfa.isEmpty());
+ }
+
+
+ @Test
+ public void testZeroOrMoreClearingBuffer() {
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).within(Time.milliseconds(8));
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ nfa.process(startEvent, 1);
+ nfa.process(middleEvent1, 3);
+ nfa.process(middleEvent2, 4);
+ nfa.process(end1, 6);
+
+ //pruning element
+ nfa.process(null, 10);
+
+ assertEquals(true, nfa.isEmpty());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
new file mode 100644
index 0000000..65fa733
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CEPMigration12to13Test {
+
+ private static String getResourceFilename(String filename) {
+ ClassLoader cl = CEPMigration12to13Test.class.getClassLoader();
+ URL resource = cl.getResource(filename);
+ if (resource == null) {
+ throw new NullPointerException("Missing snapshot resource.");
+ }
+ return resource.getFile();
+ }
+
+ @Test
+ public void testMigrationAfterBranchingPattern() throws Exception {
+
+ KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+ private static final long serialVersionUID = -4873366487571254798L;
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ };
+
+ final Event startEvent = new Event(42, "start", 1.0);
+ final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
+ final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
+ final Event endEvent = new Event(42, "end", 1.0);
+
+ // uncomment these lines for regenerating the snapshot on Flink 1.2
+// OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+// new KeyedOneInputStreamOperatorTestHarness<>(
+// new KeyedCEPPatternOperator<>(
+// Event.createTypeSerializer(),
+// false,
+// keySelector,
+// IntSerializer.INSTANCE,
+// new NFAFactory(),
+// true),
+// keySelector,
+// BasicTypeInfo.INT_TYPE_INFO);
+//
+// harness.setup();
+// harness.open();
+// harness.processElement(new StreamRecord<Event>(startEvent, 1));
+// harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+// harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+// harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+// harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
+// harness.processWatermark(new Watermark(5));
+// // simulate snapshot/restore with empty element queue but NFA state
+// OperatorStateHandles snapshot = harness.snapshot(1, 1);
+// FileOutputStream out = new FileOutputStream(
+// "src/test/resources/cep-branching-snapshot-1.2");
+// ObjectOutputStream oos = new ObjectOutputStream(out);
+// oos.writeObject(snapshot.getOperatorChainIndex());
+// oos.writeObject(snapshot.getLegacyOperatorState());
+// oos.writeObject(snapshot.getManagedKeyedState());
+// oos.writeObject(snapshot.getRawKeyedState());
+// oos.writeObject(snapshot.getManagedOperatorState());
+// oos.writeObject(snapshot.getRawOperatorState());
+// out.close();
+// harness.close();
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new NFAFactory(),
+ true),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ harness.setup();
+ final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename(
+ "cep-branching-snapshot-1.2")));
+ final OperatorStateHandles snapshot = new OperatorStateHandles(
+ (int) ois.readObject(),
+ (StreamStateHandle) ois.readObject(),
+ (Collection<KeyGroupsStateHandle>) ois.readObject(),
+ (Collection<KeyGroupsStateHandle>) ois.readObject(),
+ (Collection<OperatorStateHandle>) ois.readObject(),
+ (Collection<OperatorStateHandle>) ois.readObject()
+ );
+ harness.initializeState(snapshot);
+ harness.open();
+
+ harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+ harness.processElement(new StreamRecord<>(endEvent, 5));
+
+ harness.processWatermark(new Watermark(20));
+
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+ // watermark and 2 results
+ assertEquals(3, result.size());
+
+ Object resultObject1 = result.poll();
+ assertTrue(resultObject1 instanceof StreamRecord);
+ StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+ assertTrue(resultRecord1.getValue() instanceof Map);
+
+ Object resultObject2 = result.poll();
+ assertTrue(resultObject2 instanceof StreamRecord);
+ StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+ assertTrue(resultRecord2.getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue();
+
+ assertEquals(startEvent, patternMap1.get("start"));
+ assertEquals(middleEvent1, patternMap1.get("middle"));
+ assertEquals(endEvent, patternMap1.get("end"));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue();
+
+ assertEquals(startEvent, patternMap2.get("start"));
+ assertEquals(middleEvent2, patternMap2.get("middle"));
+ assertEquals(endEvent, patternMap2.get("end"));
+
+ harness.close();
+ }
+
+ @Test
+ public void testStartingNewPatternAfterMigration() throws Exception {
+
+ KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+ private static final long serialVersionUID = -4873366487571254798L;
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ };
+
+ final Event startEvent1 = new Event(42, "start", 1.0);
+ final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
+ final Event startEvent2 = new Event(42, "start", 5.0);
+ final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
+ final Event endEvent = new Event(42, "end", 1.0);
+
+ // uncomment these lines for regenerating the snapshot on Flink 1.2
+// OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+// new KeyedOneInputStreamOperatorTestHarness<>(
+// new KeyedCEPPatternOperator<>(
+// Event.createTypeSerializer(),
+// false,
+// keySelector,
+// IntSerializer.INSTANCE,
+// new NFAFactory(),
+// true),
+// keySelector,
+// BasicTypeInfo.INT_TYPE_INFO);
+//
+// harness.setup();
+// harness.open();
+// harness.processElement(new StreamRecord<Event>(startEvent1, 1));
+// harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+// harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+// harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+// harness.processWatermark(new Watermark(5));
+// // simulate snapshot/restore with empty element queue but NFA state
+// OperatorStateHandles snapshot = harness.snapshot(1, 1);
+// FileOutputStream out = new FileOutputStream(
+// "src/test/resources/cep-starting-snapshot-1.2");
+// ObjectOutputStream oos = new ObjectOutputStream(out);
+// oos.writeObject(snapshot.getOperatorChainIndex());
+// oos.writeObject(snapshot.getLegacyOperatorState());
+// oos.writeObject(snapshot.getManagedKeyedState());
+// oos.writeObject(snapshot.getRawKeyedState());
+// oos.writeObject(snapshot.getManagedOperatorState());
+// oos.writeObject(snapshot.getRawOperatorState());
+// out.close();
+// harness.close();
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new NFAFactory(),
+ true),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ harness.setup();
+ final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename(
+ "cep-starting-snapshot-1.2")));
+ final OperatorStateHandles snapshot = new OperatorStateHandles(
+ (int) ois.readObject(),
+ (StreamStateHandle) ois.readObject(),
+ (Collection<KeyGroupsStateHandle>) ois.readObject(),
+ (Collection<KeyGroupsStateHandle>) ois.readObject(),
+ (Collection<OperatorStateHandle>) ois.readObject(),
+ (Collection<OperatorStateHandle>) ois.readObject()
+ );
+ harness.initializeState(snapshot);
+ harness.open();
+
+ harness.processElement(new StreamRecord<>(startEvent2, 5));
+ harness.processElement(new StreamRecord<Event>(middleEvent2, 6));
+ harness.processElement(new StreamRecord<>(endEvent, 7));
+
+ harness.processWatermark(new Watermark(20));
+
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+ // watermark and 3 results
+ assertEquals(4, result.size());
+
+ Object resultObject1 = result.poll();
+ assertTrue(resultObject1 instanceof StreamRecord);
+ StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+ assertTrue(resultRecord1.getValue() instanceof Map);
+
+ Object resultObject2 = result.poll();
+ assertTrue(resultObject2 instanceof StreamRecord);
+ StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+ assertTrue(resultRecord2.getValue() instanceof Map);
+
+ Object resultObject3 = result.poll();
+ assertTrue(resultObject3 instanceof StreamRecord);
+ StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
+ assertTrue(resultRecord3.getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue();
+
+ assertEquals(startEvent1, patternMap1.get("start"));
+ assertEquals(middleEvent1, patternMap1.get("middle"));
+ assertEquals(endEvent, patternMap1.get("end"));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue();
+
+ assertEquals(startEvent1, patternMap2.get("start"));
+ assertEquals(middleEvent2, patternMap2.get("middle"));
+ assertEquals(endEvent, patternMap2.get("end"));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap3 = (Map<String, Event>) resultRecord3.getValue();
+
+ assertEquals(startEvent2, patternMap3.get("start"));
+ assertEquals(middleEvent2, patternMap3.get("middle"));
+ assertEquals(endEvent, patternMap3.get("end"));
+
+ harness.close();
+ }
+
+ @Test
+ public void testSinglePatternAfterMigration() throws Exception {
+
+ KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+ private static final long serialVersionUID = -4873366487571254798L;
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ };
+
+ final Event startEvent1 = new Event(42, "start", 1.0);
+
+ // uncomment these lines for regenerating the snapshot on Flink 1.2
+// OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+// new KeyedOneInputStreamOperatorTestHarness<>(
+// new KeyedCEPPatternOperator<>(
+// Event.createTypeSerializer(),
+// false,
+// keySelector,
+// IntSerializer.INSTANCE,
+// new SinglePatternNFAFactory(),
+// true),
+// keySelector,
+// BasicTypeInfo.INT_TYPE_INFO);
+//
+// harness.setup();
+// harness.open();
+// harness.processWatermark(new Watermark(5));
+// // simulate snapshot/restore with empty element queue but NFA state
+// OperatorStateHandles snapshot = harness.snapshot(1, 1);
+// FileOutputStream out = new FileOutputStream(
+// "src/test/resources/cep-single-pattern-snapshot-1.2");
+// ObjectOutputStream oos = new ObjectOutputStream(out);
+// oos.writeObject(snapshot.getOperatorChainIndex());
+// oos.writeObject(snapshot.getLegacyOperatorState());
+// oos.writeObject(snapshot.getManagedKeyedState());
+// oos.writeObject(snapshot.getRawKeyedState());
+// oos.writeObject(snapshot.getManagedOperatorState());
+// oos.writeObject(snapshot.getRawOperatorState());
+// out.close();
+// harness.close();
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new SinglePatternNFAFactory(),
+ true),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ harness.setup();
+ final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename(
+ "cep-single-pattern-snapshot-1.2")));
+ final OperatorStateHandles snapshot = new OperatorStateHandles(
+ (int) ois.readObject(),
+ (StreamStateHandle) ois.readObject(),
+ (Collection<KeyGroupsStateHandle>) ois.readObject(),
+ (Collection<KeyGroupsStateHandle>) ois.readObject(),
+ (Collection<OperatorStateHandle>) ois.readObject(),
+ (Collection<OperatorStateHandle>) ois.readObject()
+ );
+ harness.initializeState(snapshot);
+ harness.open();
+
+ harness.processElement(new StreamRecord<>(startEvent1, 5));
+
+ harness.processWatermark(new Watermark(20));
+
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+ // watermark and the result
+ assertEquals(2, result.size());
+
+ Object resultObject = result.poll();
+ assertTrue(resultObject instanceof StreamRecord);
+ StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+ assertTrue(resultRecord.getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+
+ assertEquals(startEvent1, patternMap.get("start"));
+
+ harness.close();
+ }
+
+ private static class SinglePatternNFAFactory implements NFACompiler.NFAFactory<Event> {
+
+ private static final long serialVersionUID = 1173020762472766713L;
+
+ private final boolean handleTimeout;
+
+ private SinglePatternNFAFactory() {
+ this(false);
+ }
+
+ private SinglePatternNFAFactory(boolean handleTimeout) {
+ this.handleTimeout = handleTimeout;
+ }
+
+ @Override
+ public NFA<Event> createNFA() {
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
+ .within(Time.milliseconds(10L));
+
+ return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+ }
+ }
+
+ private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
+
+ private static final long serialVersionUID = 1173020762472766713L;
+
+ private final boolean handleTimeout;
+
+ private NFAFactory() {
+ this(false);
+ }
+
+ private NFAFactory(boolean handleTimeout) {
+ this.handleTimeout = handleTimeout;
+ }
+
+ @Override
+ public NFA<Event> createNFA() {
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
+ .followedBy("middle")
+ .subtype(SubEvent.class)
+ .where(new MiddleFilter())
+ .followedBy("end")
+ .where(new EndFilter())
+ // add a window timeout to test whether timestamps of elements in the
+ // priority queue in CEP operator are correctly checkpointed/restored
+ .within(Time.milliseconds(10L));
+
+ return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+ }
+ }
+
+ private static class StartFilter implements FilterFunction<Event> {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("start");
+ }
+ }
+
+ private static class MiddleFilter implements FilterFunction<SubEvent> {
+ private static final long serialVersionUID = 6215754202506583964L;
+
+ @Override
+ public boolean filter(SubEvent value) throws Exception {
+ return value.getVolume() > 5.0;
+ }
+ }
+
+ private static class EndFilter implements FilterFunction<Event> {
+ private static final long serialVersionUID = 7056763917392056548L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("end");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2
new file mode 100644
index 0000000..47f710e
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 differ
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2
new file mode 100644
index 0000000..255f46a
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 differ
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2
new file mode 100644
index 0000000..c41f6c2
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 differ
[3/3] flink git commit: [FLINK-3318] Add support for quantifiers to
CEP's pattern API
Posted by kk...@apache.org.
[FLINK-3318] Add support for quantifiers to CEP's pattern API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9001c4ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9001c4ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9001c4ef
Branch: refs/heads/master
Commit: 9001c4ef82a7a04d821252ac62bb7809a931c98a
Parents: d0695c0
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Sat Mar 18 20:53:00 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Thu Mar 23 10:47:55 2017 +0100
----------------------------------------------------------------------
docs/dev/libs/cep.md | 78 +-
.../flink/cep/scala/pattern/Pattern.scala | 81 +-
.../apache/flink/cep/nfa/ComputationState.java | 44 +-
.../org/apache/flink/cep/nfa/DeweyNumber.java | 17 +-
.../main/java/org/apache/flink/cep/nfa/NFA.java | 398 ++++--
.../org/apache/flink/cep/nfa/SharedBuffer.java | 25 +-
.../java/org/apache/flink/cep/nfa/State.java | 32 +-
.../apache/flink/cep/nfa/StateTransition.java | 20 +-
.../flink/cep/nfa/StateTransitionAction.java | 4 +-
.../nfa/compiler/MalformedPatternException.java | 32 -
.../flink/cep/nfa/compiler/NFACompiler.java | 317 ++++-
.../flink/cep/pattern/FilterFunctions.java | 44 +
.../cep/pattern/MalformedPatternException.java | 32 +
.../flink/cep/pattern/NotFilterFunction.java | 42 +
.../org/apache/flink/cep/pattern/Pattern.java | 115 ++
.../apache/flink/cep/pattern/Quantifier.java | 54 +
.../org/apache/flink/cep/nfa/NFAITCase.java | 1338 +++++++++++++++++-
.../java/org/apache/flink/cep/nfa/NFATest.java | 90 +-
.../flink/cep/nfa/compiler/NFACompilerTest.java | 152 +-
.../apache/flink/cep/pattern/PatternTest.java | 54 +
20 files changed, 2595 insertions(+), 374 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 8047481..22cffbc 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -341,7 +341,45 @@ patternState.subtype(SubEvent.class);
patternState.within(Time.seconds(10));
{% endhighlight %}
</td>
- </tr>
+ </tr>
+ <tr>
+ <td><strong>ZeroOrMore</strong></td>
+ <td>
+ <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p>
+ <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p>
+ {% highlight java %}
+ patternState.zeroOrMore();
+ {% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>OneOrMore</strong></td>
+ <td>
+ <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p>
+ <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p>
+ {% highlight java %}
+ patternState.oneOrMore();
+ {% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Optional</strong></td>
+ <td>
+ <p>Specifies that this pattern can occur zero or once.</p>
+ {% highlight java %}
+ patternState.optional();
+ {% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Times</strong></td>
+ <td>
+ <p>Specifies exact number of times that this pattern should be matched.</p>
+ {% highlight java %}
+ patternState.times(2);
+ {% endhighlight %}
+ </td>
+ </tr>
</tbody>
</table>
</div>
@@ -419,6 +457,44 @@ patternState.within(Time.seconds(10))
{% endhighlight %}
</td>
</tr>
+ <tr>
+ <td><strong>ZeroOrMore</strong></td>
+ <td>
+ <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p>
+ <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p>
+ {% highlight scala %}
+ patternState.zeroOrMore()
+ {% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>OneOrMore</strong></td>
+ <td>
+ <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p>
+ <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p>
+ {% highlight scala %}
+ patternState.oneOrMore()
+ {% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Optional</strong></td>
+ <td>
+ <p>Specifies that this pattern can occur zero or once.</p>
+ {% highlight scala %}
+ patternState.optional()
+ {% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Times</strong></td>
+ <td>
+ <p>Specifies exact number of times that this pattern should be matched.</p>
+ {% highlight scala %}
+ patternState.times(2)
+ {% endhighlight %}
+ </td>
+ </tr>
</tbody>
</table>
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index cc3b03c..5baf780 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -19,7 +19,7 @@ package org.apache.flink.cep.scala.pattern
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.cep
-import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.cep.pattern.{Quantifier, Pattern => JPattern}
import org.apache.flink.streaming.api.windowing.time.Time
/**
@@ -59,6 +59,12 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
/**
*
+ * @return currently applied quantifier to this pattern
+ */
+ def getQuantifier: Quantifier = jPattern.getQuantifier
+
+ /**
+ *
* @return Filter condition for an event to be matched
*/
def getFilterFunction(): Option[FilterFunction[F]] = {
@@ -160,6 +166,79 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
wrapPattern(jPattern.getPrevious())
}
+ /**
+ * Specifies that this pattern can occur zero or more times(kleene star).
+ * This means any number of events can be matched in this state.
+ *
+ * @return The same pattern with applied Kleene star operator
+ */
+ def zeroOrMore: Pattern[T, F] = {
+ jPattern.zeroOrMore()
+ this
+ }
+
+ /**
+ * Specifies that this pattern can occur zero or more times(kleene star).
+ * This means any number of events can be matched in this state.
+ *
+ * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will generate patterns:
+ * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.
+ *
+ * @param eager if true the pattern always consumes earlier events
+ * @return The same pattern with applied Kleene star operator
+ */
+ def zeroOrMore(eager: Boolean): Pattern[T, F] = {
+ jPattern.zeroOrMore(eager)
+ this
+ }
+
+ /**
+ * Specifies that this pattern can occur one or more times(kleene star).
+ * This means at least one and at most infinite number of events can be matched in this state.
+ *
+ * @return The same pattern with applied Kleene plus operator
+ */
+ def oneOrMore: Pattern[T, F] = {
+ jPattern.oneOrMore()
+ this
+ }
+
+ /**
+ * Specifies that this pattern can occur one or more times(kleene star).
+ * This means at least one and at most infinite number of events can be matched in this state.
+ *
+ * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will generate patterns:
+ * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.
+ *
+ * @param eager if true the pattern always consumes earlier events
+ * @return The same pattern with applied Kleene plus operator
+ */
+ def oneOrMore(eager: Boolean): Pattern[T, F] = {
+ jPattern.oneOrMore(eager)
+ this
+ }
+
+ /**
+ * Specifies that this pattern can occur zero or once.
+ *
+ * @return The same pattern with applied Kleene ? operator
+ */
+ def optional: Pattern[T, F] = {
+ jPattern.optional()
+ this
+ }
+
+ /**
+ * Specifies exact number of times that this pattern should be matched.
+ *
+ * @param times number of times matching event must appear
+ * @return The same pattern with number of times applied
+ */
+ def times(times: Int): Pattern[T, F] = {
+ jPattern.times(times)
+ this
+ }
+
}
object Pattern {
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 3f44fba..445d038 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -18,6 +18,8 @@
package org.apache.flink.cep.nfa;
+import org.apache.flink.util.Preconditions;
+
/**
* Helper class which encapsulates the state of the NFA computation. It points to the current state,
* the last taken event, its occurrence timestamp, the current version and the starting timestamp
@@ -41,17 +43,21 @@ public class ComputationState<T> {
// Timestamp of the first element in the pattern
private final long startTimestamp;
- public ComputationState(
- final State<T> currentState,
- final T event,
- final long timestamp,
- final DeweyNumber version,
- final long startTimestamp) {
+ private final State<T> previousState;
+
+ private ComputationState(
+ final State<T> currentState,
+ final State<T> previousState,
+ final T event,
+ final long timestamp,
+ final DeweyNumber version,
+ final long startTimestamp) {
this.state = currentState;
this.event = event;
this.timestamp = timestamp;
this.version = version;
this.startTimestamp = startTimestamp;
+ this.previousState = previousState;
}
public boolean isFinalState() {
@@ -59,7 +65,7 @@ public class ComputationState<T> {
}
public boolean isStartState() {
- return state.isStart();
+ return state.isStart() && event == null;
}
public long getTimestamp() {
@@ -74,6 +80,10 @@ public class ComputationState<T> {
return state;
}
+ public State<T> getPreviousState() {
+ return previousState;
+ }
+
public T getEvent() {
return event;
}
@@ -81,4 +91,24 @@ public class ComputationState<T> {
public DeweyNumber getVersion() {
return version;
}
+
+ public static <T> ComputationState<T> createStartState(final State<T> state) {
+ Preconditions.checkArgument(state.isStart());
+ return new ComputationState<>(state, null, null, -1L, new DeweyNumber(1), -1L);
+ }
+
+ public static <T> ComputationState<T> createStartState(final State<T> state, final DeweyNumber version) {
+ Preconditions.checkArgument(state.isStart());
+ return new ComputationState<>(state, null, null, -1L, version, -1L);
+ }
+
+ public static <T> ComputationState<T> createState(
+ final State<T> currentState,
+ final State<T> previousState,
+ final T event,
+ final long timestamp,
+ final DeweyNumber version,
+ final long startTimestamp) {
+ return new ComputationState<>(currentState, previousState, event, timestamp, version, startTimestamp);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index bb9039d..fd3fafa 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -44,6 +44,10 @@ public class DeweyNumber implements Serializable {
this.deweyNumber = deweyNumber;
}
+ public DeweyNumber(DeweyNumber number) {
+ this.deweyNumber = Arrays.copyOf(number.deweyNumber, number.deweyNumber.length);
+ }
+
/**
* Checks whether this dewey number is compatible to the other dewey number.
*
@@ -90,8 +94,19 @@ public class DeweyNumber implements Serializable {
* @return A new dewey number derived from this whose last digit is increased by one
*/
public DeweyNumber increase() {
+ return increase(1);
+ }
+
+ /**
+ * Creates a new dewey number from this such that its last digit is increased by the supplied
+ * number
+ *
+ * @param times how many times to increase the Dewey number
+ * @return A new dewey number derived from this whose last digit is increased by given number
+ */
+ public DeweyNumber increase(int times) {
int[] newDeweyNumber = Arrays.copyOf(deweyNumber, deweyNumber.length);
- newDeweyNumber[deweyNumber.length - 1]++;
+ newDeweyNumber[deweyNumber.length - 1] += times;
return new DeweyNumber(newDeweyNumber);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 257418a..3d42248 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -19,6 +19,7 @@
package org.apache.flink.cep.nfa;
import com.google.common.collect.LinkedHashMultimap;
+import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
@@ -39,7 +40,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -87,7 +87,7 @@ public class NFA<T> implements Serializable {
/**
* Buffer used to store the matched events.
*/
- private final SharedBuffer<State<T>, T> sharedBuffer;
+ private final SharedBuffer<String, T> sharedBuffer;
/**
* A set of all the valid NFA states, as returned by the
@@ -98,7 +98,7 @@ public class NFA<T> implements Serializable {
/**
* The length of a windowed pattern, as specified using the
- * {@link org.apache.flink.cep.pattern.Pattern#within(Time) Pattern.within(Time)}
+ * {@link org.apache.flink.cep.pattern.Pattern#within(Time)} Pattern.within(Time)}
* method.
*/
private final long windowTime;
@@ -109,9 +109,6 @@ public class NFA<T> implements Serializable {
*/
private final boolean handleTimeout;
- // Current starting index for the next dewey version number
- private int startEventCounter;
-
/**
* Current set of {@link ComputationState computation states} within the state machine.
* These are the "active" intermediate states that are waiting for new matching
@@ -119,8 +116,6 @@ public class NFA<T> implements Serializable {
*/
private transient Queue<ComputationState<T>> computationStates;
- private StateTransitionComparator<T> stateTransitionComparator;
-
public NFA(
final TypeSerializer<T> eventSerializer,
final long windowTime,
@@ -129,11 +124,10 @@ public class NFA<T> implements Serializable {
this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer);
this.windowTime = windowTime;
this.handleTimeout = handleTimeout;
- this.sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
- this.computationStates = new LinkedList<>();
- this.states = new HashSet<>();
- this.startEventCounter = 1;
- this.stateTransitionComparator = new StateTransitionComparator<>();
+ sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
+ computationStates = new LinkedList<>();
+
+ states = new HashSet<>();
}
public Set<State<T>> getStates() {
@@ -150,7 +144,7 @@ public class NFA<T> implements Serializable {
states.add(state);
if (state.isStart()) {
- computationStates.add(new ComputationState<>(state, null, -1L, null, -1L));
+ computationStates.add(ComputationState.createStartState(state));
}
}
@@ -201,8 +195,8 @@ public class NFA<T> implements Serializable {
}
// remove computation state which has exceeded the window length
- sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
- sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
+ sharedBuffer.release(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
+ sharedBuffer.remove(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
newComputationStates = Collections.emptyList();
} else if (event != null) {
@@ -218,8 +212,8 @@ public class NFA<T> implements Serializable {
result.addAll(matches);
// remove found patterns because they are no longer needed
- sharedBuffer.release(newComputationState.getState(), newComputationState.getEvent(), newComputationState.getTimestamp());
- sharedBuffer.remove(newComputationState.getState(), newComputationState.getEvent(), newComputationState.getTimestamp());
+ sharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
+ sharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
} else {
// add new computation state; it will be processed once the next event arrives
computationStates.add(newComputationState);
@@ -252,8 +246,7 @@ public class NFA<T> implements Serializable {
return nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
sharedBuffer.equals(other.sharedBuffer) &&
states.equals(other.states) &&
- windowTime == other.windowTime &&
- startEventCounter == other.startEventCounter;
+ windowTime == other.windowTime;
} else {
return false;
}
@@ -261,12 +254,80 @@ public class NFA<T> implements Serializable {
@Override
public int hashCode() {
- return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime, startEventCounter);
+ return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime);
+ }
+
+ private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) {
+ return s1.getName().equals(s2.getName());
}
/**
+ * Class for storing resolved transitions. It counts at insert time the number of
+ * branching transitions both for IGNORE and TAKE actions.
+ */
+ private static class OutgoingEdges<T> {
+ private List<StateTransition<T>> edges = new ArrayList<>();
+
+ private final State<T> currentState;
+
+ private int totalTakeBranches = 0;
+ private int totalIgnoreBranches = 0;
+
+ OutgoingEdges(final State<T> currentState) {
+ this.currentState = currentState;
+ }
+
+ void add(StateTransition<T> edge) {
+
+ if (!isSelfIgnore(edge)) {
+ if (edge.getAction() == StateTransitionAction.IGNORE) {
+ totalIgnoreBranches++;
+ } else if (edge.getAction() == StateTransitionAction.TAKE) {
+ totalTakeBranches++;
+ }
+ }
+
+ edges.add(edge);
+ }
+
+ int getTotalIgnoreBranches() {
+ return totalIgnoreBranches;
+ }
+ int getTotalTakeBranches() {
+ return totalTakeBranches;
+ }
+
+ List<StateTransition<T>> getEdges() {
+ return edges;
+ }
+
+ private boolean isSelfIgnore(final StateTransition<T> edge) {
+ return isEquivalentState(edge.getTargetState(), currentState) &&
+ edge.getAction() == StateTransitionAction.IGNORE;
+ }
+ }
+
+
+ /**
* Computes the next computation states based on the given computation state, the current event,
- * its timestamp and the internal state machine.
+ * its timestamp and the internal state machine. The algorithm is:
+ *
+ * 1. Decide on valid transitions and number of branching paths. See {@link OutgoingEdges}
+ * 2. Perform transitions:
+ * a) IGNORE (links in {@link SharedBuffer} will still point to the previous event)
+ * - do not perform for Start State - special case
+ * - if stays in the same state increase the current stage for future use with number of
+ * outgoing edges
+ * - if after PROCEED increase current stage and add new stage (as we change the state)
+ * - lock the entry in {@link SharedBuffer} as it is needed in the created branch
+ * b) TAKE (links in {@link SharedBuffer} will point to the current event)
+ * - add entry to the shared buffer with version of the current computation state
+ * - add stage and then increase with number of takes for the future computation states
+ * - peek to the next state if it has PROCEED path to a Final State, if true create
+ * Final ComputationState to emit results
+ * 3. Handle the Start State, as it always have to remain
+ * 4. Release the corresponding entries in {@link SharedBuffer}.
+ *
*
* @param computationState Current computation state
* @param event Current event which is processed
@@ -277,31 +338,179 @@ public class NFA<T> implements Serializable {
final ComputationState<T> computationState,
final T event,
final long timestamp) {
- Stack<State<T>> states = new Stack<>();
- List<ComputationState<T>> resultingComputationStates = new ArrayList<>();
- State<T> state = computationState.getState();
- states.push(state);
+ final OutgoingEdges<T> outgoingEdges = createDecisionGraph(computationState, event);
+
+ // Create the computing version based on the previously computed edges
+ // We need to defer the creation of computation states until we know how many edges start
+ // at this computation state so that we can assign proper version
+ final List<StateTransition<T>> edges = outgoingEdges.getEdges();
+ int takeBranchesToVisit = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1);
+ int ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches();
+
+ final List<ComputationState<T>> resultingComputationStates = new ArrayList<>();
+ for (StateTransition<T> edge : edges) {
+ switch (edge.getAction()) {
+ case IGNORE: {
+ if (!computationState.isStartState()) {
+ final DeweyNumber version;
+ if (isEquivalentState(edge.getTargetState(), computationState.getState())) {
+ //Stay in the same state (it can be either looping one or singleton)
+ final int toIncrease = calculateIncreasingSelfState(
+ outgoingEdges.getTotalIgnoreBranches(),
+ outgoingEdges.getTotalTakeBranches());
+ version = computationState.getVersion().increase(toIncrease);
+ } else {
+ //IGNORE after PROCEED
+ version = computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
+ ignoreBranchesToVisit--;
+ }
- boolean branched = false;
- while (!states.isEmpty()) {
- State<T> currentState = states.pop();
- final List<StateTransition<T>> stateTransitions = new ArrayList<>(currentState.getStateTransitions());
+ resultingComputationStates.add(
+ ComputationState.createState(
+ edge.getTargetState(),
+ computationState.getPreviousState(),
+ computationState.getEvent(),
+ computationState.getTimestamp(),
+ version,
+ computationState.getStartTimestamp()
+ )
+ );
+ sharedBuffer.lock(
+ computationState.getPreviousState().getName(),
+ computationState.getEvent(),
+ computationState.getTimestamp());
+ }
+ }
+ break;
+ case TAKE:
+ final State<T> newState = edge.getTargetState();
+ final State<T> consumingState = edge.getSourceState();
+ final State<T> previousEventState = computationState.getPreviousState();
+
+ final T previousEvent = computationState.getEvent();
+ final DeweyNumber currentVersion = computationState.getVersion();
+
+ final DeweyNumber newComputationStateVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
+ takeBranchesToVisit--;
+
+ final long startTimestamp;
+ if (computationState.isStartState()) {
+ startTimestamp = timestamp;
+ sharedBuffer.put(
+ consumingState.getName(),
+ event,
+ timestamp,
+ currentVersion);
+ } else {
+ startTimestamp = computationState.getStartTimestamp();
+ sharedBuffer.put(
+ consumingState.getName(),
+ event,
+ timestamp,
+ previousEventState.getName(),
+ previousEvent,
+ computationState.getTimestamp(),
+ currentVersion);
+ }
- // this is for when we restore from legacy. In that case, the comparator is null
- // as it did not exist in the previous Flink versions, so we have to initialize it here.
+ // a new computation state is referring to the shared entry
+ sharedBuffer.lock(consumingState.getName(), event, timestamp);
+
+ resultingComputationStates.add(ComputationState.createState(
+ newState,
+ consumingState,
+ event,
+ timestamp,
+ newComputationStateVersion,
+ startTimestamp
+ ));
+
+ //check if newly created state is optional (have a PROCEED path to Final state)
+ final State<T> finalState = findFinalStateAfterProceed(newState, event);
+ if (finalState != null) {
+ sharedBuffer.lock(consumingState.getName(), event, timestamp);
+ resultingComputationStates.add(ComputationState.createState(
+ finalState,
+ consumingState,
+ event,
+ timestamp,
+ newComputationStateVersion,
+ startTimestamp));
+ }
+ break;
+ }
+ }
- if (stateTransitionComparator == null) {
- stateTransitionComparator = new StateTransitionComparator();
+ if (computationState.isStartState()) {
+ final int totalBranches = calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(), outgoingEdges.getTotalTakeBranches());
+ final ComputationState<T> startState = createStartState(computationState, totalBranches);
+ resultingComputationStates.add(startState);
+ }
+
+ if (computationState.getEvent() != null) {
+ // release the shared entry referenced by the current computation state.
+ sharedBuffer.release(
+ computationState.getPreviousState().getName(),
+ computationState.getEvent(),
+ computationState.getTimestamp());
+ // try to remove unnecessary shared buffer entries
+ sharedBuffer.remove(
+ computationState.getPreviousState().getName(),
+ computationState.getEvent(),
+ computationState.getTimestamp());
+ }
+
+ return resultingComputationStates;
+ }
+
+ private State<T> findFinalStateAfterProceed(State<T> state, T event) {
+ final Stack<State<T>> statesToCheck = new Stack<>();
+ statesToCheck.push(state);
+
+ try {
+ while (!statesToCheck.isEmpty()) {
+ final State<T> currentState = statesToCheck.pop();
+ for (StateTransition<T> transition : currentState.getStateTransitions()) {
+ if (transition.getAction() == StateTransitionAction.PROCEED &&
+ checkFilterCondition(transition.getCondition(), event)) {
+ if (transition.getTargetState().isFinal()) {
+ return transition.getTargetState();
+ } else {
+ statesToCheck.push(transition.getTargetState());
+ }
+ }
+ }
}
+ } catch (Exception e) {
+ throw new RuntimeException("Failure happened in filter function.", e);
+ }
+
+ return null;
+ }
+
+ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
+ return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + 1;
+ }
+
+ private ComputationState<T> createStartState(final ComputationState<T> computationState, final int totalBranches) {
+ final DeweyNumber startVersion = computationState.getVersion().increase(totalBranches);
+ return ComputationState.createStartState(computationState.getState(), startVersion);
+ }
- // impose the IGNORE will be processed last
- Collections.sort(stateTransitions, stateTransitionComparator);
+ private OutgoingEdges<T> createDecisionGraph(ComputationState<T> computationState, T event) {
+ final Stack<State<T>> states = new Stack<>();
+ states.push(computationState.getState());
+ final OutgoingEdges<T> outgoingEdges = new OutgoingEdges<>(computationState.getState());
+ //First create all outgoing edges, so to be able to reason about the Dewey version
+ while (!states.isEmpty()) {
+ State<T> currentState = states.pop();
+ Collection<StateTransition<T>> stateTransitions = currentState.getStateTransitions();
// check all state transitions for each state
- for (StateTransition<T> stateTransition: stateTransitions) {
+ for (StateTransition<T> stateTransition : stateTransitions) {
try {
- if (stateTransition.getCondition() == null || stateTransition.getCondition().filter(event)) {
+ if (checkFilterCondition(stateTransition.getCondition(), event)) {
// filter condition is true
switch (stateTransition.getAction()) {
case PROCEED:
@@ -310,73 +519,8 @@ public class NFA<T> implements Serializable {
states.push(stateTransition.getTargetState());
break;
case IGNORE:
- final DeweyNumber version;
- if (branched) {
- version = computationState.getVersion().increase();
- } else {
- version = computationState.getVersion();
- }
- resultingComputationStates.add(new ComputationState<T>(
- computationState.getState(),
- computationState.getEvent(),
- computationState.getTimestamp(),
- version,
- computationState.getStartTimestamp()));
-
- // we have a new computation state referring to the same the shared entry
- // the lock of the current computation is released later on
- sharedBuffer.lock(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
- break;
case TAKE:
- final State<T> newState = stateTransition.getTargetState();
- final DeweyNumber oldVersion;
- final DeweyNumber newComputationStateVersion;
- final State<T> previousState = computationState.getState();
- final T previousEvent = computationState.getEvent();
- final long previousTimestamp;
- final long startTimestamp;
-
- if (computationState.isStartState()) {
- oldVersion = new DeweyNumber(startEventCounter++);
- newComputationStateVersion = oldVersion.addStage();
- startTimestamp = timestamp;
- previousTimestamp = -1L;
-
- } else {
- startTimestamp = computationState.getStartTimestamp();
- previousTimestamp = computationState.getTimestamp();
- oldVersion = computationState.getVersion();
-
- branched = true;
- newComputationStateVersion = oldVersion.addStage();
- }
-
- if (previousState.isStart()) {
- sharedBuffer.put(
- newState,
- event,
- timestamp,
- oldVersion);
- } else {
- sharedBuffer.put(
- newState,
- event,
- timestamp,
- previousState,
- previousEvent,
- previousTimestamp,
- oldVersion);
- }
-
- // a new computation state is referring to the shared entry
- sharedBuffer.lock(newState, event, timestamp);
-
- resultingComputationStates.add(new ComputationState<T>(
- newState,
- event,
- timestamp,
- newComputationStateVersion,
- startTimestamp));
+ outgoingEdges.add(stateTransition);
break;
}
}
@@ -385,19 +529,12 @@ public class NFA<T> implements Serializable {
}
}
}
+ return outgoingEdges;
+ }
- if (computationState.isStartState()) {
- // a computation state is always kept if it refers to a starting state because every
- // new element can start a new pattern
- resultingComputationStates.add(computationState);
- } else {
- // release the shared entry referenced by the current computation state.
- sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
- // try to remove unnecessary shared buffer entries
- sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
- }
- return resultingComputationStates;
+ private boolean checkFilterCondition(FilterFunction<T> condition, T event) throws Exception {
+ return condition == null || condition.filter(event);
}
/**
@@ -409,8 +546,8 @@ public class NFA<T> implements Serializable {
* @return Collection of event sequences which end in the given computation state
*/
private Collection<Map<String, T>> extractPatternMatches(final ComputationState<T> computationState) {
- Collection<LinkedHashMultimap<State<T>, T>> paths = sharedBuffer.extractPatterns(
- computationState.getState(),
+ Collection<LinkedHashMultimap<String, T>> paths = sharedBuffer.extractPatterns(
+ computationState.getPreviousState().getName(),
computationState.getEvent(),
computationState.getTimestamp(),
computationState.getVersion());
@@ -420,19 +557,20 @@ public class NFA<T> implements Serializable {
TypeSerializer<T> serializer = nonDuplicatingTypeSerializer.getTypeSerializer();
// generate the correct names from the collection of LinkedHashMultimaps
- for (LinkedHashMultimap<State<T>, T> path: paths) {
+ for (LinkedHashMultimap<String, T> path: paths) {
Map<String, T> resultPath = new HashMap<>();
- for (State<T> key: path.keySet()) {
+ for (String key: path.keySet()) {
int counter = 0;
Set<T> events = path.get(key);
// we iterate over the elements in insertion order
for (T event: events) {
resultPath.put(
- events.size() > 1 ? generateStateName(key.getName(), counter): key.getName(),
+ events.size() > 1 ? generateStateName(key, counter): key,
// copy the element so that the user can change it
serializer.isImmutableType() ? event : serializer.copy(event)
);
+ counter++;
}
}
@@ -472,6 +610,7 @@ public class NFA<T> implements Serializable {
private void writeComputationState(final ComputationState<T> computationState, final ObjectOutputStream oos) throws IOException {
oos.writeObject(computationState.getState());
+ oos.writeObject(computationState.getPreviousState());
oos.writeLong(computationState.getTimestamp());
oos.writeObject(computationState.getVersion());
oos.writeLong(computationState.getStartTimestamp());
@@ -490,6 +629,7 @@ public class NFA<T> implements Serializable {
@SuppressWarnings("unchecked")
private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException {
final State<T> state = (State<T>)ois.readObject();
+ final State<T> previousState = (State<T>)ois.readObject();
final long timestamp = ois.readLong();
final DeweyNumber version = (DeweyNumber)ois.readObject();
final long startTimestamp = ois.readLong();
@@ -504,7 +644,7 @@ public class NFA<T> implements Serializable {
event = null;
}
- return new ComputationState<>(state, event, timestamp, version, startTimestamp);
+ return ComputationState.createState(state, previousState, event, timestamp, version, startTimestamp);
}
/**
@@ -629,20 +769,4 @@ public class NFA<T> implements Serializable {
return getClass().hashCode();
}
}
-
- /**
- * Comparator used for imposing the assumption that IGNORE is always the last StateTransition in a state.
- */
- private static final class StateTransitionComparator<T> implements Serializable, Comparator<StateTransition<T>> {
-
- private static final long serialVersionUID = -2775474935413622278L;
-
- @Override
- public int compare(final StateTransition<T> o1, final StateTransition<T> o2) {
- if (o1.getAction() == o2.getAction()) {
- return 0;
- }
- return o1.getAction() == StateTransitionAction.IGNORE ? 1 : -1;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index b7e288b..e6a8c75 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -212,28 +212,27 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
SharedBufferEntry<K, V> entry = get(key, value, timestamp);
if (entry != null) {
- extractionStates.add(new ExtractionState<K, V>(entry, version, new Stack<SharedBufferEntry<K, V>>()));
+ extractionStates.add(new ExtractionState<>(entry, version, new Stack<SharedBufferEntry<K, V>>()));
// use a depth first search to reconstruct the previous relations
while (!extractionStates.isEmpty()) {
- ExtractionState<K, V> extractionState = extractionStates.pop();
- DeweyNumber currentVersion = extractionState.getVersion();
+ final ExtractionState<K, V> extractionState = extractionStates.pop();
// current path of the depth first search
- Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath();
+ final Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath();
+ final SharedBufferEntry<K, V> currentEntry = extractionState.getEntry();
// termination criterion
- if (currentVersion.length() == 1) {
- LinkedHashMultimap<K, V> completePath = LinkedHashMultimap.create();
+ if (currentEntry == null) {
+ final LinkedHashMultimap<K, V> completePath = LinkedHashMultimap.create();
while(!currentPath.isEmpty()) {
- SharedBufferEntry<K, V> currentEntry = currentPath.pop();
+ final SharedBufferEntry<K, V> currentPathEntry = currentPath.pop();
- completePath.put(currentEntry.getKey(), currentEntry.getValueTime().getValue());
+ completePath.put(currentPathEntry.getKey(), currentPathEntry.getValueTime().getValue());
}
result.add(completePath);
} else {
- SharedBufferEntry<K, V> currentEntry = extractionState.getEntry();
// append state to the path
currentPath.push(currentEntry);
@@ -242,17 +241,18 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) {
// we can only proceed if the current version is compatible to the version
// of this previous relation
+ final DeweyNumber currentVersion = extractionState.getVersion();
if (currentVersion.isCompatibleWith(edge.getVersion())) {
if (firstMatch) {
// for the first match we don't have to copy the current path
- extractionStates.push(new ExtractionState<K, V>(edge.getTarget(), edge.getVersion(), currentPath));
+ extractionStates.push(new ExtractionState<>(edge.getTarget(), edge.getVersion(), currentPath));
firstMatch = false;
} else {
- Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
+ final Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
copy.addAll(currentPath);
extractionStates.push(
- new ExtractionState<K, V>(
+ new ExtractionState<>(
edge.getTarget(),
edge.getVersion(),
copy));
@@ -260,6 +260,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
}
}
}
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 50b2cf3..7bcb6ea 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -18,6 +18,8 @@
package org.apache.flink.cep.nfa;
+import org.apache.flink.api.common.functions.FilterFunction;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@@ -43,7 +45,7 @@ public class State<T> implements Serializable {
this.name = name;
this.stateType = stateType;
- stateTransitions = new ArrayList<StateTransition<T>>();
+ stateTransitions = new ArrayList<>();
}
public boolean isFinal() {
@@ -60,8 +62,32 @@ public class State<T> implements Serializable {
return stateTransitions;
}
- public void addStateTransition(final StateTransition<T> stateTransition) {
- stateTransitions.add(stateTransition);
+
+ private void addStateTransition(
+ final StateTransitionAction action,
+ final State<T> targetState,
+ final FilterFunction<T> condition) {
+ stateTransitions.add(new StateTransition<T>(this, action, targetState, condition));
+ }
+
+ public void addIgnore(final FilterFunction<T> condition) {
+ addStateTransition(StateTransitionAction.IGNORE, this, condition);
+ }
+
+ public void addIgnore(final State<T> targetState,final FilterFunction<T> condition) {
+ addStateTransition(StateTransitionAction.IGNORE, targetState, condition);
+ }
+
+ public void addTake(final State<T> targetState, final FilterFunction<T> condition) {
+ addStateTransition(StateTransitionAction.TAKE, targetState, condition);
+ }
+
+ public void addProceed(final State<T> targetState, final FilterFunction<T> condition) {
+ addStateTransition(StateTransitionAction.PROCEED, targetState, condition);
+ }
+
+ public void addTake(final FilterFunction<T> condition) {
+ addStateTransition(StateTransitionAction.TAKE, this, condition);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
index 479f28a..e3c7b7a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
@@ -27,12 +27,18 @@ public class StateTransition<T> implements Serializable {
private static final long serialVersionUID = -4825345749997891838L;
private final StateTransitionAction action;
+ private final State<T> sourceState;
private final State<T> targetState;
private final FilterFunction<T> condition;
- public StateTransition(final StateTransitionAction action, final State<T> targetState, final FilterFunction<T> condition) {
+ public StateTransition(
+ final State<T> sourceState,
+ final StateTransitionAction action,
+ final State<T> targetState,
+ final FilterFunction<T> condition) {
this.action = action;
this.targetState = targetState;
+ this.sourceState = sourceState;
this.condition = condition;
}
@@ -44,6 +50,10 @@ public class StateTransition<T> implements Serializable {
return targetState;
}
+ public State<T> getSourceState() {
+ return sourceState;
+ }
+
public FilterFunction<T> getCondition() {
return condition;
}
@@ -55,6 +65,7 @@ public class StateTransition<T> implements Serializable {
StateTransition<T> other = (StateTransition<T>) obj;
return action == other.action &&
+ sourceState.getName().equals(other.sourceState.getName()) &&
targetState.getName().equals(other.targetState.getName());
} else {
return false;
@@ -64,14 +75,17 @@ public class StateTransition<T> implements Serializable {
@Override
public int hashCode() {
// we have to take the name of targetState because the transition might be reflexive
- return Objects.hash(action, targetState.getName());
+ return Objects.hash(action, targetState.getName(), sourceState.getName());
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("StateTransition(").append(action).append(", ").append(targetState.getName());
+ builder.append("StateTransition(")
+ .append(action).append(", ")
+ .append(sourceState.getName()).append(", ")
+ .append(targetState.getName());
if (condition != null) {
builder.append(", with filter)");
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
index 70fc7fb..b8ca4e8 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
@@ -22,7 +22,7 @@ package org.apache.flink.cep.nfa;
* Set of actions when doing a state transition from a {@link State} to another.
*/
public enum StateTransitionAction {
- TAKE, // take the current event and assign it to the new state
- IGNORE, // ignore the current event and do the state transition
+ TAKE, // take the current event and assign it to the current state
+ IGNORE, // ignore the current event
PROCEED // do the state transition and keep the current event for further processing (epsilon transition)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
deleted file mode 100644
index a3bb5f4..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.nfa.compiler;
-
-/**
- * An exception used to indicate that a {@link org.apache.flink.cep.pattern.Pattern}
- * was not specified correctly.
- */
-public class MalformedPatternException extends RuntimeException {
-
- private static final long serialVersionUID = 7751134834983361543L;
-
- public MalformedPatternException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 18ed21f..b476c49 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -22,18 +22,21 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.State;
-import org.apache.flink.cep.nfa.StateTransition;
-import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.pattern.FilterFunctions;
import org.apache.flink.cep.pattern.FollowedByPattern;
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.cep.pattern.NotFilterFunction;
import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map;
+import java.util.List;
import java.util.Set;
/**
@@ -42,7 +45,7 @@ import java.util.Set;
*/
public class NFACompiler {
- protected final static String BEGINNING_STATE_NAME = "$beginningState$";
+ protected static final String ENDING_STATE_NAME = "$endState$";
/**
* Compiles the given pattern into a {@link NFA}.
@@ -74,88 +77,288 @@ public class NFACompiler {
*/
@SuppressWarnings("unchecked")
public static <T> NFAFactory<T> compileFactory(
- Pattern<T, ?> pattern,
- TypeSerializer<T> inputTypeSerializer,
+ final Pattern<T, ?> pattern,
+ final TypeSerializer<T> inputTypeSerializer,
boolean timeoutHandling) {
if (pattern == null) {
// return a factory for empty NFAs
- return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling);
+ return new NFAFactoryImpl<>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling);
} else {
- // set of all generated states
- Map<String, State<T>> states = new HashMap<>();
- long windowTime;
+ final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern);
+ nfaFactoryCompiler.compileFactory();
+ return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling);
+ }
+ }
+
+ /**
+ * Converts a {@link Pattern} into graph of {@link State}. It enables sharing of
+ * compilation state across methods.
+ *
+ * @param <T>
+ */
+ private static class NFAFactoryCompiler<T> {
+
+ private final Set<String> usedNames = new HashSet<>();
+ private final List<State<T>> states = new ArrayList<>();
- // this is used to enforse pattern name uniqueness.
- Set<String> patternNames = new HashSet<>();
+ private long windowTime = 0;
+ private Pattern<T, ?> currentPattern;
- Pattern<T, ?> succeedingPattern;
- State<T> succeedingState;
- Pattern<T, ?> currentPattern = pattern;
+ NFAFactoryCompiler(final Pattern<T, ?> pattern) {
+ this.currentPattern = pattern;
+ }
+ /**
+ * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create
+ * multiple NFAs.
+ */
+ void compileFactory() {
// we're traversing the pattern from the end to the beginning --> the first state is the final state
- State<T> currentState = new State<>(currentPattern.getName(), State.StateType.Final);
- patternNames.add(currentPattern.getName());
+ State<T> sinkState = createEndingState();
+ // add all the normal states
+ sinkState = createMiddleStates(sinkState);
+ // add the beginning state
+ createStartState(sinkState);
+ }
- states.put(currentPattern.getName(), currentState);
+ List<State<T>> getStates() {
+ return states;
+ }
+
+ long getWindowTime() {
+ return windowTime;
+ }
+
+ /**
+ * Creates the dummy Final {@link State} of the NFA graph.
+ * @return dummy Final state
+ */
+ private State<T> createEndingState() {
+ State<T> endState = new State<>(ENDING_STATE_NAME, State.StateType.Final);
+ states.add(endState);
+ usedNames.add(ENDING_STATE_NAME);
windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L;
+ return endState;
+ }
- while (currentPattern.getPrevious() != null) {
- succeedingPattern = currentPattern;
- succeedingState = currentState;
- currentPattern = currentPattern.getPrevious();
+ /**
+ * Creates all the states between Start and Final state.
+ * @param sinkState the state that last state should point to (always the Final state)
+ * @return the next state after Start in the resulting graph
+ */
+ private State<T> createMiddleStates(final State<T> sinkState) {
- if (!patternNames.add(currentPattern.getName())) {
- throw new MalformedPatternException("Duplicate pattern name: " + currentPattern.getName() + ". " +
- "Pattern names must be unique.");
+ State<T> lastSink = sinkState;
+ while (currentPattern.getPrevious() != null) {
+ checkPatternNameUniqueness();
+
+ State<T> sourceState = new State<>(currentPattern.getName(), State.StateType.Normal);
+ states.add(sourceState);
+ usedNames.add(sourceState.getName());
+
+ if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
+ convertToLooping(sourceState, lastSink);
+
+ if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
+ sourceState = createFirstMandatoryStateOfLoop(sourceState, State.StateType.Normal);
+ states.add(sourceState);
+ usedNames.add(sourceState.getName());
+ }
+ } else if (currentPattern.getQuantifier() == Quantifier.TIMES) {
+ sourceState = convertToTimesState(sourceState, lastSink, currentPattern.getTimes());
+ } else {
+ convertToSingletonState(sourceState, lastSink);
}
- Time currentWindowTime = currentPattern.getWindowTime();
+ currentPattern = currentPattern.getPrevious();
+ lastSink = sourceState;
+ final Time currentWindowTime = currentPattern.getWindowTime();
if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {
// the window time is the global minimum of all window times of each state
windowTime = currentWindowTime.toMilliseconds();
}
+ }
+
+ return lastSink;
+ }
+
+ private void checkPatternNameUniqueness() {
+ if (usedNames.contains(currentPattern.getName())) {
+ throw new MalformedPatternException(
+ "Duplicate pattern name: " + currentPattern.getName() + ". " +
+ "Pattern names must be unique.");
+ }
+ }
+
+ /**
+ * Creates the Start {@link State} of the resulting NFA graph.
+ * @param sinkState the state that Start state should point to (alwyas first state of middle states)
+ * @return created state
+ */
+ @SuppressWarnings("unchecked")
+ private State<T> createStartState(State<T> sinkState) {
+ checkPatternNameUniqueness();
+
+ final State<T> beginningState;
+ if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
+ final State<T> loopingState;
+ if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
+ loopingState = new State<>(currentPattern.getName(), State.StateType.Normal);
+ beginningState = createFirstMandatoryStateOfLoop(loopingState, State.StateType.Start);
+ states.add(loopingState);
+ } else {
+ loopingState = new State<>(currentPattern.getName(), State.StateType.Start);
+ beginningState = loopingState;
+ }
+ convertToLooping(loopingState, sinkState, true);
+ } else {
+ if (currentPattern.getQuantifier() == Quantifier.TIMES && currentPattern.getTimes() > 1) {
+ final State<T> timesState = new State<>(currentPattern.getName(), State.StateType.Normal);
+ states.add(timesState);
+ sinkState = convertToTimesState(timesState, sinkState, currentPattern.getTimes() - 1);
+ }
- if (states.containsKey(currentPattern.getName())) {
- currentState = states.get(currentPattern.getName());
+ beginningState = new State<>(currentPattern.getName(), State.StateType.Start);
+ convertToSingletonState(beginningState, sinkState);
+ }
+
+ states.add(beginningState);
+ usedNames.add(beginningState.getName());
+
+ return beginningState;
+ }
+
+ /**
+ * Converts the given state into a "complex" state consisting of given number of states with
+ * same {@link FilterFunction}
+ *
+ * @param sourceState the state to be converted
+ * @param sinkState the state that the converted state should point to
+ * @param times number of times the state should be copied
+ * @return the first state of the "complex" state, next state should point to it
+ */
+ private State<T> convertToTimesState(final State<T> sourceState, final State<T> sinkState, int times) {
+ convertToSingletonState(sourceState, sinkState);
+ State<T> lastSink;
+ State<T> firstState = sourceState;
+ for (int i = 0; i < times - 1; i++) {
+ lastSink = firstState;
+ firstState = new State<>(currentPattern.getName(), State.StateType.Normal);
+ states.add(firstState);
+ convertToSingletonState(firstState, lastSink);
+ }
+ return firstState;
+ }
+
+ /**
+ * Converts the given state into a simple single state. For an OPTIONAL state it also consists
+ * of a similar state without the PROCEED edge, so that for each PROCEED transition branches
+ * in computation state graph can be created only once.
+ *
+ * @param sourceState the state to be converted
+ * @param sinkState state that the state being converted should point to
+ */
+ @SuppressWarnings("unchecked")
+ private void convertToSingletonState(final State<T> sourceState, final State<T> sinkState) {
+
+ final FilterFunction<T> currentFilterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
+ final FilterFunction<T> trueFunction = FilterFunctions.trueFunction();
+ sourceState.addTake(sinkState, currentFilterFunction);
+
+ if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) {
+ sourceState.addProceed(sinkState, trueFunction);
+ }
+
+ if (currentPattern instanceof FollowedByPattern) {
+ final State<T> ignoreState;
+ if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) {
+ ignoreState = new State<>(currentPattern.getName(), State.StateType.Normal);
+ ignoreState.addTake(sinkState, currentFilterFunction);
+ states.add(ignoreState);
} else {
- currentState = new State<>(currentPattern.getName(), State.StateType.Normal);
- states.put(currentState.getName(), currentState);
+ ignoreState = sourceState;
}
+ sourceState.addIgnore(ignoreState, trueFunction);
+ }
+ }
- currentState.addStateTransition(new StateTransition<T>(
- StateTransitionAction.TAKE,
- succeedingState,
- (FilterFunction<T>) succeedingPattern.getFilterFunction()));
-
- if (succeedingPattern instanceof FollowedByPattern) {
- // the followed by pattern entails a reflexive ignore transition
- currentState.addStateTransition(new StateTransition<T>(
- StateTransitionAction.IGNORE,
- currentState,
- null
- ));
+ /**
+ * Patterns with quantifiers AT_LEAST_ONE_* are converted into pair of states: a singleton state and
+ * looping state. This method creates the first of the two.
+ *
+ * @param sinkState the state the newly created state should point to, it should be a looping state
+ * @param stateType the type of the created state, as the NFA graph can also start wit AT_LEAST_ONE_*
+ * @return the newly created state
+ */
+ @SuppressWarnings("unchecked")
+ private State<T> createFirstMandatoryStateOfLoop(final State<T> sinkState, final State.StateType stateType) {
+
+ final FilterFunction<T> currentFilterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
+ final State<T> firstState = new State<>(currentPattern.getName(), stateType);
+
+ firstState.addTake(sinkState, currentFilterFunction);
+ if (currentPattern instanceof FollowedByPattern) {
+ if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) {
+ firstState.addIgnore(new NotFilterFunction<>(currentFilterFunction));
+ } else {
+ firstState.addIgnore(FilterFunctions.<T>trueFunction());
}
}
+ return firstState;
+ }
- // add the beginning state
- final State<T> beginningState;
+ /**
+ * Converts the given state into looping one. Looping state is one with TAKE edge to itself and
+ * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
+ * for each PROCEED transition branches in computation state graph can be created only once.
+ *
+ * <p>If this looping state is first of a graph we should treat the {@link Pattern} as {@link FollowedByPattern}
+ * to enable combinations.
+ *
+ * @param sourceState the state to converted
+ * @param sinkState the state that the converted state should point to
+ * @param isFirstState if the looping state is first of a graph
+ */
+ @SuppressWarnings("unchecked")
+ private void convertToLooping(final State<T> sourceState, final State<T> sinkState, boolean isFirstState) {
+
+ final FilterFunction<T> filterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
+ final FilterFunction<T> trueFunction = FilterFunctions.<T>trueFunction();
+
+ sourceState.addProceed(sinkState, trueFunction);
+ sourceState.addTake(filterFunction);
+ if (currentPattern instanceof FollowedByPattern || isFirstState) {
+ final State<T> ignoreState = new State<>(
+ currentPattern.getName(),
+ State.StateType.Normal);
+
+
+ final FilterFunction<T> ignoreCondition;
+ if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) {
+ ignoreCondition = new NotFilterFunction<>(filterFunction);
+ } else {
+ ignoreCondition = trueFunction;
+ }
- if (states.containsKey(BEGINNING_STATE_NAME)) {
- beginningState = states.get(BEGINNING_STATE_NAME);
- } else {
- beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
- states.put(BEGINNING_STATE_NAME, beginningState);
+ sourceState.addIgnore(ignoreState, ignoreCondition);
+ ignoreState.addTake(sourceState, filterFunction);
+ ignoreState.addIgnore(ignoreState, ignoreCondition);
+ states.add(ignoreState);
}
+ }
- beginningState.addStateTransition(new StateTransition<T>(
- StateTransitionAction.TAKE,
- currentState,
- (FilterFunction<T>) currentPattern.getFilterFunction()
- ));
-
- return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
+ /**
+ * Converts the given state into looping one. Looping state is one with TAKE edge to itself and
+ * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
+ * for each PROCEED transition branches in computation state graph can be created only once.
+ *
+ * @param sourceState the state to converted
+ * @param sinkState the state that the converted state should point to
+ */
+ private void convertToLooping(final State<T> sourceState, final State<T> sinkState) {
+ convertToLooping(sourceState, sinkState, false);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
new file mode 100644
index 0000000..12e58ba
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+public class FilterFunctions<T> {
+
+ private FilterFunctions() {
+ }
+
+ public static <T> FilterFunction<T> trueFunction() {
+ return new FilterFunction<T>() {
+ @Override
+ public boolean filter(T value) throws Exception {
+ return true;
+ }
+ };
+ }
+
+ public static <T> FilterFunction<T> falseFunction() {
+ return new FilterFunction<T>() {
+ @Override
+ public boolean filter(T value) throws Exception {
+ return false;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
new file mode 100644
index 0000000..c85f3be
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+/**
+ * An exception used to indicate that a {@link org.apache.flink.cep.pattern.Pattern}
+ * was not specified correctly.
+ */
+public class MalformedPatternException extends RuntimeException {
+
+ private static final long serialVersionUID = 7751134834983361543L;
+
+ public MalformedPatternException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
new file mode 100644
index 0000000..a4fc8f5
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * A filter function which negates filter function.
+ *
+ * @param <T> Type of the element to filter
+ */
+public class NotFilterFunction<T> implements FilterFunction<T> {
+ private static final long serialVersionUID = -2109562093871155005L;
+
+ private final FilterFunction<T> original;
+
+ public NotFilterFunction(final FilterFunction<T> original) {
+ this.original = original;
+ }
+
+ @Override
+ public boolean filter(T value) throws Exception {
+ return !original.filter(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 7ea675f..7b4d9c7 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
/**
* Base class for a pattern definition.
@@ -53,6 +54,10 @@ public class Pattern<T, F extends T> {
// window length in which the pattern match has to occur
private Time windowTime;
+ private Quantifier quantifier = Quantifier.ONE;
+
+ private int times;
+
protected Pattern(final String name, final Pattern<T, ? extends T> previous) {
this.name = name;
this.previous = previous;
@@ -74,6 +79,14 @@ public class Pattern<T, F extends T> {
return windowTime;
}
+ public Quantifier getQuantifier() {
+ return quantifier;
+ }
+
+ public int getTimes() {
+ return times;
+ }
+
/**
* Specifies a filter condition which has to be fulfilled by an event in order to be matched.
*
@@ -183,4 +196,106 @@ public class Pattern<T, F extends T> {
return new Pattern<X, X>(name, null);
}
+ /**
+ * Specifies that this pattern can occur zero or more times(kleene star).
+ * This means any number of events can be matched in this state.
+ *
+ * @return The same pattern with applied Kleene star operator
+ *
+ * @throws MalformedPatternException if quantifier already applied
+ */
+ public Pattern<T, F> zeroOrMore() {
+ return zeroOrMore(true);
+ }
+
+ /**
+ * Specifies that this pattern can occur zero or more times(kleene star).
+ * This means any number of events can be matched in this state.
+ *
+ * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will generate patterns:
+ * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.
+ *
+ * @param eager if true the pattern always consumes earlier events
+ * @return The same pattern with applied Kleene star operator
+ *
+ * @throws MalformedPatternException if quantifier already applied
+ */
+ public Pattern<T, F> zeroOrMore(final boolean eager) {
+ checkIfQuantifierApplied();
+ if (eager) {
+ this.quantifier = Quantifier.ZERO_OR_MORE_EAGER;
+ } else {
+ this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS;
+ }
+ return this;
+ }
+
+ /**
+ * Specifies that this pattern can occur one or more times(kleene star).
+ * This means at least one and at most infinite number of events can be matched in this state.
+ *
+ * @return The same pattern with applied Kleene plus operator
+ *
+ * @throws MalformedPatternException if quantifier already applied
+ */
+ public Pattern<T, F> oneOrMore() {
+ return oneOrMore(true);
+ }
+
+ /**
+ * Specifies that this pattern can occur one or more times(kleene star).
+ * This means at least one and at most infinite number of events can be matched in this state.
+ *
+ * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will generate patterns:
+ * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.
+ *
+ * @param eager if true the pattern always consumes earlier events
+ * @return The same pattern with applied Kleene plus operator
+ *
+ * @throws MalformedPatternException if quantifier already applied
+ */
+ public Pattern<T, F> oneOrMore(final boolean eager) {
+ checkIfQuantifierApplied();
+ if (eager) {
+ this.quantifier = Quantifier.ONE_OR_MORE_EAGER;
+ } else {
+ this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS;
+ }
+ return this;
+ }
+
+ /**
+ * Specifies that this pattern can occur zero or once.
+ *
+ * @return The same pattern with applied Kleene ? operator
+ *
+ * @throws MalformedPatternException if quantifier already applied
+ */
+ public Pattern<T, F> optional() {
+ checkIfQuantifierApplied();
+ this.quantifier = Quantifier.OPTIONAL;
+ return this;
+ }
+
+ /**
+ * Specifies exact number of times that this pattern should be matched.
+ *
+ * @param times number of times matching event must appear
+ * @return The same pattern with number of times applied
+ *
+ * @throws MalformedPatternException if quantifier already applied
+ */
+ public Pattern<T, F> times(int times) {
+ checkIfQuantifierApplied();
+ Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
+ this.quantifier = Quantifier.TIMES;
+ this.times = times;
+ return this;
+ }
+
+ private void checkIfQuantifierApplied() {
+ if (this.quantifier != Quantifier.ONE) {
+ throw new MalformedPatternException("Already applied quantifier to this Pattern. Current quantifier is: " + this.quantifier);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
new file mode 100644
index 0000000..7abe9bd
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.pattern;
+
+import java.util.EnumSet;
+
+public enum Quantifier {
+ ONE,
+ ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, QuantifierProperty.EAGER),
+ ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING),
+ ONE_OR_MORE_EAGER(
+ QuantifierProperty.LOOPING,
+ QuantifierProperty.EAGER,
+ QuantifierProperty.AT_LEAST_ONE),
+ ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, QuantifierProperty.AT_LEAST_ONE),
+ TIMES,
+ OPTIONAL;
+
+ private final EnumSet<QuantifierProperty> properties;
+
+ Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) {
+ this.properties = EnumSet.of(first, rest);
+ }
+
+ Quantifier() {
+ this.properties = EnumSet.noneOf(QuantifierProperty.class);
+ }
+
+ public boolean hasProperty(QuantifierProperty property) {
+ return properties.contains(property);
+ }
+
+ public enum QuantifierProperty {
+ LOOPING,
+ EAGER,
+ AT_LEAST_ONE
+ }
+
+}
[2/3] flink git commit: [FLINK-3318] Add support for quantifiers to
CEP's pattern API
Posted by kk...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index ccae848..825ba957 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -46,9 +46,9 @@ public class NFAITCase extends TestLogger {
public void testSimplePatternNFA() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
- Event startEvent = new Event(42, "start", 1.0);
+ Event startEvent = new Event(41, "start", 1.0);
SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
- Event endEvent= new Event(43, "end", 1.0);
+ Event endEvent = new Event(43, "end", 1.0);
inputEvents.add(new StreamRecord<Event>(startEvent, 1));
inputEvents.add(new StreamRecord<Event>(new Event(43, "foobar", 1.0), 2));
@@ -102,6 +102,99 @@ public class NFAITCase extends TestLogger {
assertEquals(endEvent, patternMap.get("end"));
}
+ @Test
+ public void testStrictContinuityWithResults() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event end = new Event(42, "b", 4.0);
+
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(end, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).next("end").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(1, allPatterns.size());
+ assertEquals(Sets.<Set<Event>>newHashSet(
+ Sets.newHashSet(middleEvent1, end)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testStrictContinuityNoResults() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "c", 3.0);
+ Event end = new Event(43, "b", 4.0);
+
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(end, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).next("end").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ }
+ }
+
+ assertEquals(Sets.newHashSet(), resultingPatterns);
+ }
+
/**
* Tests that the NFA successfully filters out expired elements with respect to the window
* length
@@ -327,6 +420,1247 @@ public class NFAITCase extends TestLogger {
), patterns);
}
+ @Test
+ public void testComplexBranchingAfterKleeneStar() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+ Event end2 = new Event(45, "d", 6.0);
+ Event end3 = new Event(46, "d", 7.0);
+ Event end4 = new Event(47, "e", 8.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+ inputEvents.add(new StreamRecord<>(end1, 6));
+ inputEvents.add(new StreamRecord<>(end2, 7));
+ inputEvents.add(new StreamRecord<>(end3, 8));
+ inputEvents.add(new StreamRecord<>(end4, 9));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ })
+ .followedBy("end2").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ })
+ .followedBy("end3").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("e");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(16, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end2, end4),
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1, end2, end4),
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1, end2, end4),
+ Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end1, end2, end4),
+ Sets.newHashSet(startEvent, middleEvent1, end1, end2, end4),
+ Sets.newHashSet(startEvent, middleEvent2, end1, end2, end4),
+ Sets.newHashSet(startEvent, middleEvent3, end1, end2, end4),
+ Sets.newHashSet(startEvent, end1, end2, end4),
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1, end3, end4),
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1, end3, end4),
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end1, end3, end4),
+ Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end1, end3, end4),
+ Sets.newHashSet(startEvent, middleEvent1, end1, end3, end4),
+ Sets.newHashSet(startEvent, middleEvent2, end1, end3, end4),
+ Sets.newHashSet(startEvent, middleEvent3, end1, end3, end4),
+ Sets.newHashSet(startEvent, end1, end3, end4)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testKleeneStar() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(end1, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(4, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
+ Sets.newHashSet(startEvent, middleEvent1, end1),
+ Sets.newHashSet(startEvent, middleEvent2, end1),
+ Sets.newHashSet(startEvent, end1)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testEagerKleeneStar() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+ inputEvents.add(new StreamRecord<>(end1, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).zeroOrMore(true).followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(4, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
+ Sets.newHashSet(startEvent, middleEvent1, end1),
+ Sets.newHashSet(startEvent, end1)
+ ), resultingPatterns);
+ }
+
+
+ @Test
+ public void testBeginWithKleeneStar() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event middleEvent1 = new Event(40, "a", 2.0);
+ Event middleEvent2 = new Event(41, "a", 3.0);
+ Event middleEvent3 = new Event(41, "a", 3.0);
+ Event end = new Event(42, "b", 4.0);
+
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+ inputEvents.add(new StreamRecord<>(end, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).zeroOrMore().followedBy("end").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(7, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(middleEvent1, middleEvent2, middleEvent3, end),
+ Sets.newHashSet(middleEvent1, middleEvent2, end),
+ Sets.newHashSet(middleEvent2, middleEvent3, end),
+ Sets.newHashSet(middleEvent1, end),
+ Sets.newHashSet(middleEvent2, end),
+ Sets.newHashSet(middleEvent3, end),
+ Sets.newHashSet(end)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testKleeneStarAfterKleeneStar() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "d", 3.0);
+ Event middleEvent3 = new Event(43, "d", 4.0);
+ Event end = new Event(44, "e", 4.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+ inputEvents.add(new StreamRecord<>(end, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle-first").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).zeroOrMore(false).followedBy("middle-second").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ }).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("e");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(8, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end),
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent3, end),
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end),
+ Sets.newHashSet(startEvent, middleEvent2, middleEvent3, end),
+ Sets.newHashSet(startEvent, middleEvent3, end),
+ Sets.newHashSet(startEvent, middleEvent2, end),
+ Sets.newHashSet(startEvent, middleEvent1, end),
+ Sets.newHashSet(startEvent, end)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testKleeneStarAfterBranching() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event merging = new Event(42, "f", 3.0);
+ Event kleene1 = new Event(43, "d", 4.0);
+ Event kleene2 = new Event(44, "d", 4.0);
+ Event end = new Event(45, "e", 4.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(merging, 5));
+ inputEvents.add(new StreamRecord<>(kleene1, 6));
+ inputEvents.add(new StreamRecord<>(kleene2, 7));
+ inputEvents.add(new StreamRecord<>(end, 8));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("branching").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).followedBy("merging").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("f");
+ }
+ }).followedBy("kleene").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ }).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("e");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(8, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(startEvent, middleEvent1, merging, end),
+ Sets.newHashSet(startEvent, middleEvent1, merging, kleene1, end),
+ Sets.newHashSet(startEvent, middleEvent1, merging, kleene2, end),
+ Sets.newHashSet(startEvent, middleEvent1, merging, kleene1, kleene2, end),
+ Sets.newHashSet(startEvent, middleEvent2, merging, end),
+ Sets.newHashSet(startEvent, middleEvent2, merging, kleene1, end),
+ Sets.newHashSet(startEvent, middleEvent2, merging, kleene2, end),
+ Sets.newHashSet(startEvent, middleEvent2, merging, kleene1, kleene2, end)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testStrictContinuityNoResultsAfterKleeneStar() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event start = new Event(40, "d", 2.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 2.0);
+ Event middleEvent3 = new Event(43, "c", 3.0);
+ Event end = new Event(44, "b", 4.0);
+
+ inputEvents.add(new StreamRecord<>(start, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+ inputEvents.add(new StreamRecord<>(end, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).zeroOrMore()
+ .next("end").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ }
+ }
+
+ assertEquals(Sets.newHashSet(), resultingPatterns);
+ }
+
+ @Test
+ public void testStrictContinuityResultsAfterKleeneStar() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event start = new Event(40, "d", 2.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 2.0);
+ Event end = new Event(43, "b", 4.0);
+
+ inputEvents.add(new StreamRecord<>(start, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+ inputEvents.add(new StreamRecord<>(end, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).zeroOrMore(false)
+ .next("end").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(2, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(start, middleEvent1, middleEvent2, end),
+ Sets.newHashSet(start, middleEvent2, end)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testAtLeastOne() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(end1, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).oneOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(3, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
+ Sets.newHashSet(startEvent, middleEvent1, end1),
+ Sets.newHashSet(startEvent, middleEvent2, end1)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testBeginWithAtLeastOne() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent1 = new Event(41, "a", 2.0);
+ Event startEvent2 = new Event(42, "a", 3.0);
+ Event startEvent3 = new Event(42, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent1, 3));
+ inputEvents.add(new StreamRecord<>(startEvent2, 4));
+ inputEvents.add(new StreamRecord<>(startEvent3, 5));
+ inputEvents.add(new StreamRecord<>(end1, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).oneOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(7, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(startEvent1, startEvent2, startEvent3, end1),
+ Sets.newHashSet(startEvent1, startEvent2, end1),
+ Sets.newHashSet(startEvent1, startEvent3, end1),
+ Sets.newHashSet(startEvent2, startEvent3, end1),
+ Sets.newHashSet(startEvent1, end1),
+ Sets.newHashSet(startEvent2, end1),
+ Sets.newHashSet(startEvent3, end1)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testNextZeroOrMore() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "start", 1.0);
+ Event middleEvent1 = new Event(40, "middle", 2.0);
+ Event middleEvent2 = new Event(40, "middle", 3.0);
+ Event middleEvent3 = new Event(40, "middle", 4.0);
+ Event endEvent = new Event(46, "end", 1.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1L));
+ inputEvents.add(new StreamRecord<>(new Event(1, "event", 1.0), 2L));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3L));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4L));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 5L));
+ inputEvents.add(new StreamRecord<>(endEvent, 6L));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 6215754202506583964L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("start");
+ }
+ }).next("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 6215754202506583964L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("middle");
+ }
+ }).zeroOrMore(false).followedBy("end").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 7056763917392056548L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("end");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(1, allPatterns.size());
+ assertEquals(Sets.<Set<Event>>newHashSet(
+ Sets.newHashSet(startEvent, endEvent)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testAtLeastOneEager() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+ inputEvents.add(new StreamRecord<>(end1, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).oneOrMore(true).followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(3, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1),
+ Sets.newHashSet(startEvent, middleEvent1, end1)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testOptional() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent, 5));
+ inputEvents.add(new StreamRecord<>(end1, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).optional().followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(2, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(startEvent, middleEvent, end1),
+ Sets.newHashSet(startEvent, end1)
+ ), resultingPatterns);
+ }
+
+
+ @Test
+ public void testTimes() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+ inputEvents.add(new StreamRecord<>(end1, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).next("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(2).followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(1, allPatterns.size());
+ assertEquals(Sets.<Set<Event>>newHashSet(
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, end1)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testStartWithTimes() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+ inputEvents.add(new StreamRecord<>(end1, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(2).followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(2, allPatterns.size());
+ assertEquals(Sets.<Set<Event>>newHashSet(
+ Sets.newHashSet(middleEvent1, middleEvent2, end1),
+ Sets.newHashSet(middleEvent2, middleEvent3, end1)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testStartWithOptional() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(end1, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).optional().followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(2, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(startEvent, end1),
+ Sets.newHashSet(end1)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testEndWithZeroOrMore() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).zeroOrMore();
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(4, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3),
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2),
+ Sets.newHashSet(startEvent, middleEvent1),
+ Sets.newHashSet(startEvent)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testStartAndEndWithZeroOrMore() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "d", 5.0);
+ Event end2 = new Event(45, "d", 5.0);
+ Event end3 = new Event(46, "d", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+ inputEvents.add(new StreamRecord<>(end1, 6));
+ inputEvents.add(new StreamRecord<>(end2, 6));
+ inputEvents.add(new StreamRecord<>(end3, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).zeroOrMore();
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(6, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(middleEvent1, middleEvent2, middleEvent3),
+ Sets.newHashSet(middleEvent1, middleEvent2),
+ Sets.newHashSet(middleEvent1),
+ Sets.newHashSet(middleEvent2, middleEvent3),
+ Sets.newHashSet(middleEvent2),
+ Sets.newHashSet(middleEvent3)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testEndWithOptional() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).optional();
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(2, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(startEvent, middleEvent1),
+ Sets.newHashSet(startEvent)
+ ), resultingPatterns);
+ }
+
+ @Test
+ public void testEndWithOneOrMore() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).oneOrMore();
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ Set<Set<Event>> resultingPatterns = new HashSet<>();
+ List<Collection<Event>> allPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, Event>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, Event> foundPattern : patterns) {
+ resultingPatterns.add(new HashSet<>(foundPattern.values()));
+ allPatterns.add(foundPattern.values());
+ }
+ }
+
+ assertEquals(3, allPatterns.size());
+ assertEquals(Sets.newHashSet(
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2, middleEvent3),
+ Sets.newHashSet(startEvent, middleEvent1, middleEvent2),
+ Sets.newHashSet(startEvent, middleEvent1)
+ ), resultingPatterns);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 9f65132..40a0e7e 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -21,6 +21,7 @@ package org.apache.flink.cep.nfa;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.cep.Event;
+import org.apache.flink.cep.pattern.FilterFunctions;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -51,12 +52,12 @@ public class NFATest extends TestLogger {
streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 3L));
streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 4L));
- State<Event> startingState = new State<>("", State.StateType.Start);
- State<Event> startState = new State<>("start", State.StateType.Normal);
- State<Event> endState = new State<>("end", State.StateType.Final);
- StateTransition<Event> starting2Start = new StateTransition<>(
- StateTransitionAction.TAKE,
- startState,
+ State<Event> startState = new State<>("start", State.StateType.Start);
+ State<Event> endState = new State<>("end", State.StateType.Normal);
+ State<Event> endingState = new State<>("", State.StateType.Final);
+
+ startState.addTake(
+ endState,
new FilterFunction<Event>() {
private static final long serialVersionUID = -4869589195918650396L;
@@ -64,12 +65,9 @@ public class NFATest extends TestLogger {
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
- }
- );
-
- StateTransition<Event> start2End = new StateTransition<>(
- StateTransitionAction.TAKE,
- endState,
+ });
+ endState.addTake(
+ endingState,
new FilterFunction<Event>() {
private static final long serialVersionUID = 2979804163709590673L;
@@ -77,18 +75,12 @@ public class NFATest extends TestLogger {
public boolean filter(Event value) throws Exception {
return value.getName().equals("end");
}
- }
- );
-
- StateTransition<Event> start2Start = new StateTransition<>(StateTransitionAction.IGNORE, startState, null);
-
- startingState.addStateTransition(starting2Start);
- startState.addStateTransition(start2End);
- startState.addStateTransition(start2Start);
+ });
+ endState.addIgnore(FilterFunctions.<Event>trueFunction());
- nfa.addState(startingState);
nfa.addState(startState);
nfa.addState(endState);
+ nfa.addState(endingState);
Set<Map<String, Event>> expectedPatterns = new HashSet<>();
@@ -196,8 +188,10 @@ public class NFATest extends TestLogger {
public <T> Collection<Map<String, T>> runNFA(NFA<T> nfa, List<StreamRecord<T>> inputs) {
Set<Map<String, T>> actualPatterns = new HashSet<>();
- for (StreamRecord<T> streamEvent: inputs) {
- Collection<Map<String, T>> matchedPatterns = nfa.process(streamEvent.getValue(), streamEvent.getTimestamp()).f0;
+ for (StreamRecord<T> streamEvent : inputs) {
+ Collection<Map<String, T>> matchedPatterns = nfa.process(
+ streamEvent.getValue(),
+ streamEvent.getTimestamp()).f0;
actualPatterns.addAll(matchedPatterns);
}
@@ -213,24 +207,12 @@ public class NFATest extends TestLogger {
State<Event> startState = new State<>("start", State.StateType.Normal);
State<Event> endState = new State<>("end", State.StateType.Final);
- StateTransition<Event> starting2Start = new StateTransition<>(
- StateTransitionAction.TAKE,
- startState,
- new NameFilter("start"));
- StateTransition<Event> start2End = new StateTransition<>(
- StateTransitionAction.TAKE,
- endState,
+ startingState.addTake(
+ new NameFilter("start"));
+ startState.addTake(
new NameFilter("end"));
-
- StateTransition<Event> start2Start = new StateTransition<>(
- StateTransitionAction.IGNORE,
- startState,
- null);
-
- startingState.addStateTransition(starting2Start);
- startState.addStateTransition(start2End);
- startState.addStateTransition(start2Start);
+ startState.addIgnore(null);
nfa.addState(startingState);
nfa.addState(startState);
@@ -253,12 +235,12 @@ public class NFATest extends TestLogger {
private NFA<Event> createStartEndNFA(long windowLength) {
NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), windowLength, false);
- State<Event> startingState = new State<>("", State.StateType.Start);
- State<Event> startState = new State<>("start", State.StateType.Normal);
- State<Event> endState = new State<>("end", State.StateType.Final);
- StateTransition<Event> starting2Start = new StateTransition<>(
- StateTransitionAction.TAKE,
- startState,
+ State<Event> startState = new State<>("start", State.StateType.Start);
+ State<Event> endState = new State<>("end", State.StateType.Normal);
+ State<Event> endingState = new State<>("", State.StateType.Final);
+
+ startState.addTake(
+ endState,
new FilterFunction<Event>() {
private static final long serialVersionUID = -4869589195918650396L;
@@ -267,10 +249,8 @@ public class NFATest extends TestLogger {
return value.getName().equals("start");
}
});
-
- StateTransition<Event> start2End = new StateTransition<>(
- StateTransitionAction.TAKE,
- endState,
+ endState.addTake(
+ endingState,
new FilterFunction<Event>() {
private static final long serialVersionUID = 2979804163709590673L;
@@ -279,19 +259,11 @@ public class NFATest extends TestLogger {
return value.getName().equals("end");
}
});
+ endState.addIgnore(FilterFunctions.<Event>trueFunction());
- StateTransition<Event> start2Start = new StateTransition<>(
- StateTransitionAction.IGNORE,
- startState,
- null);
-
- startingState.addStateTransition(starting2Start);
- startState.addStateTransition(start2End);
- startState.addStateTransition(start2Start);
-
- nfa.addState(startingState);
nfa.addState(startState);
nfa.addState(endState);
+ nfa.addState(endingState);
return nfa;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index d11f3a8..93d78cc 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -18,9 +18,11 @@
package org.apache.flink.cep.nfa.compiler;
+import com.google.common.collect.Sets;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
@@ -28,22 +30,45 @@ import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.State;
import org.apache.flink.cep.nfa.StateTransition;
import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.pattern.MalformedPatternException;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.util.TestLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import static org.junit.Assert.assertTrue;
+import static com.google.common.collect.Sets.newHashSet;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class NFACompilerTest extends TestLogger {
+ private static final FilterFunction<Event> startFilter = new FilterFunction<Event>() {
+ private static final long serialVersionUID = 3314714776170474221L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getPrice() > 2;
+ }
+ };
+
+ private static final FilterFunction<Event> endFilter = new FilterFunction<Event>() {
+ private static final long serialVersionUID = 3990995859716364087L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("end");
+ }
+ };
+
+ private static final TypeSerializer<Event> serializer = TypeExtractor.createTypeInfo(Event.class)
+ .createSerializer(new ExecutionConfig());
+
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -81,83 +106,96 @@ public class NFACompilerTest extends TestLogger {
*/
@Test
public void testNFACompilerWithSimplePattern() {
- Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
- private static final long serialVersionUID = 3314714776170474221L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getPrice() > 2;
- }
- })
- .followedBy("middle").subtype(SubEvent.class)
- .next("end").where(new FilterFunction<Event>() {
- private static final long serialVersionUID = 3990995859716364087L;
+ Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(startFilter)
+ .followedBy("middle").subtype(SubEvent.class)
+ .next("end").where(endFilter);
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("end");
- }
- });
-
- TypeInformation<Event> typeInformation = TypeExtractor.createTypeInfo(Event.class);
-
- NFA<Event> nfa = NFACompiler.compile(pattern, typeInformation.createSerializer(new ExecutionConfig()), false);
+ NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false);
Set<State<Event>> states = nfa.getStates();
-
assertEquals(4, states.size());
Map<String, State<Event>> stateMap = new HashMap<>();
-
- for (State<Event> state: states) {
+ for (State<Event> state : states) {
stateMap.put(state.getName(), state);
}
- assertTrue(stateMap.containsKey(NFACompiler.BEGINNING_STATE_NAME));
- State<Event> beginningState = stateMap.get(NFACompiler.BEGINNING_STATE_NAME);
-
- assertTrue(beginningState.isStart());
-
assertTrue(stateMap.containsKey("start"));
State<Event> startState = stateMap.get("start");
+ assertTrue(startState.isStart());
+ final Set<Tuple2<String, StateTransitionAction>> startTransitions = unfoldTransitions(startState);
+ assertEquals(newHashSet(
+ Tuple2.of("middle", StateTransitionAction.TAKE)
+ ), startTransitions);
- Collection<StateTransition<Event>> startTransitions = startState.getStateTransitions();
- Map<String, StateTransition<Event>> startTransitionMap = new HashMap<>();
+ assertTrue(stateMap.containsKey("middle"));
+ State<Event> middleState = stateMap.get("middle");
+ final Set<Tuple2<String, StateTransitionAction>> middleTransitions = unfoldTransitions(middleState);
+ assertEquals(newHashSet(
+ Tuple2.of("middle", StateTransitionAction.IGNORE),
+ Tuple2.of("end", StateTransitionAction.TAKE)
+ ), middleTransitions);
- for (StateTransition<Event> transition: startTransitions) {
- startTransitionMap.put(transition.getTargetState().getName(), transition);
- }
+ assertTrue(stateMap.containsKey("end"));
+ State<Event> endState = stateMap.get("end");
+ final Set<Tuple2<String, StateTransitionAction>> endTransitions = unfoldTransitions(endState);
+ assertEquals(newHashSet(
+ Tuple2.of(NFACompiler.ENDING_STATE_NAME, StateTransitionAction.TAKE)
+ ), endTransitions);
+
+ assertTrue(stateMap.containsKey(NFACompiler.ENDING_STATE_NAME));
+ State<Event> endingState = stateMap.get(NFACompiler.ENDING_STATE_NAME);
+ assertTrue(endingState.isFinal());
+ assertEquals(0, endingState.getStateTransitions().size());
+ }
- assertEquals(2, startTransitionMap.size());
- assertTrue(startTransitionMap.containsKey("start"));
+ @Test
+ public void testNFACompilerWithKleeneStar() {
- StateTransition<Event> reflexiveTransition = startTransitionMap.get("start");
- assertEquals(StateTransitionAction.IGNORE, reflexiveTransition.getAction());
+ Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(startFilter)
+ .followedBy("middle").subtype(SubEvent.class).zeroOrMore()
+ .followedBy("end").where(endFilter);
- assertTrue(startTransitionMap.containsKey("middle"));
- StateTransition<Event> startMiddleTransition = startTransitionMap.get("middle");
- assertEquals(StateTransitionAction.TAKE, startMiddleTransition.getAction());
+ NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false);
- assertTrue(stateMap.containsKey("middle"));
- State<Event> middleState = stateMap.get("middle");
+ Set<State<Event>> states = nfa.getStates();
+ assertEquals(5, states.size());
- Map<String, StateTransition<Event>> middleTransitionMap = new HashMap<>();
- for (StateTransition<Event> transition: middleState.getStateTransitions()) {
- middleTransitionMap.put(transition.getTargetState().getName(), transition);
+ Set<Tuple2<String, Set<Tuple2<String, StateTransitionAction>>>> stateMap = new HashSet<>();
+ for (State<Event> state : states) {
+ stateMap.add(Tuple2.of(state.getName(), unfoldTransitions(state)));
}
- assertEquals(1, middleTransitionMap.size());
+ assertEquals(stateMap, newHashSet(
+ Tuple2.of("start", newHashSet(Tuple2.of("middle", StateTransitionAction.TAKE))),
+ Tuple2.of("middle", newHashSet(
+ Tuple2.of("middle", StateTransitionAction.IGNORE),
+ Tuple2.of("middle", StateTransitionAction.TAKE)
+ )),
+ Tuple2.of("middle", newHashSet(
+ Tuple2.of("middle", StateTransitionAction.IGNORE),
+ Tuple2.of("middle", StateTransitionAction.TAKE),
+ Tuple2.of("end", StateTransitionAction.PROCEED)
+ )),
+ Tuple2.of("end", newHashSet(
+ Tuple2.of(NFACompiler.ENDING_STATE_NAME, StateTransitionAction.TAKE),
+ Tuple2.of("end", StateTransitionAction.IGNORE)
+ )),
+ Tuple2.of(NFACompiler.ENDING_STATE_NAME, Sets.newHashSet())
+ ));
- assertTrue(middleTransitionMap.containsKey("end"));
- StateTransition<Event> middleEndTransition = middleTransitionMap.get("end");
+ }
- assertEquals(StateTransitionAction.TAKE, middleEndTransition.getAction());
- assertTrue(stateMap.containsKey("end"));
- State<Event> endState = stateMap.get("end");
-
- assertTrue(endState.isFinal());
- assertEquals(0, endState.getStateTransitions().size());
+ private <T> Set<Tuple2<String, StateTransitionAction>> unfoldTransitions(final State<T> state) {
+ final Set<Tuple2<String, StateTransitionAction>> transitions = new HashSet<>();
+ for (StateTransition<T> transition : state.getStateTransitions()) {
+ transitions.add(Tuple2.of(
+ transition.getTargetState().getName(),
+ transition.getAction()));
+ }
+ return transitions;
}
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index 98c3f5a..68b0419 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -184,4 +184,58 @@ public class PatternTest extends TestLogger {
assertEquals(previous2.getName(), "start");
}
+ @Test(expected = MalformedPatternException.class)
+ public void testPatternCanHaveQuantifierSpecifiedOnce1() throws Exception {
+
+ Pattern.begin("start").where(new FilterFunction<Object>() {
+ @Override
+ public boolean filter(Object value) throws Exception {
+ return true;
+ }
+ }).oneOrMore().zeroOrMore();
+ }
+
+ @Test(expected = MalformedPatternException.class)
+ public void testPatternCanHaveQuantifierSpecifiedOnce2() throws Exception {
+
+ Pattern.begin("start").where(new FilterFunction<Object>() {
+ @Override
+ public boolean filter(Object value) throws Exception {
+ return true;
+ }
+ }).zeroOrMore().times(1);
+ }
+
+ @Test(expected = MalformedPatternException.class)
+ public void testPatternCanHaveQuantifierSpecifiedOnce3() throws Exception {
+
+ Pattern.begin("start").where(new FilterFunction<Object>() {
+ @Override
+ public boolean filter(Object value) throws Exception {
+ return true;
+ }
+ }).times(1).oneOrMore();
+ }
+
+ @Test(expected = MalformedPatternException.class)
+ public void testPatternCanHaveQuantifierSpecifiedOnce4() throws Exception {
+
+ Pattern.begin("start").where(new FilterFunction<Object>() {
+ @Override
+ public boolean filter(Object value) throws Exception {
+ return true;
+ }
+ }).oneOrMore().oneOrMore(true);
+ }
+
+ @Test(expected = MalformedPatternException.class)
+ public void testPatternCanHaveQuantifierSpecifiedOnce5() throws Exception {
+
+ Pattern.begin("start").where(new FilterFunction<Object>() {
+ @Override
+ public boolean filter(Object value) throws Exception {
+ return true;
+ }
+ }).oneOrMore(true).zeroOrMore(true);
+ }
}