You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/03/23 12:47:26 UTC
[3/3] flink git commit: [FLINK-3318] Add support for quantifiers to
CEP's pattern API
[FLINK-3318] Add support for quantifiers to CEP's pattern API
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9001c4ef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9001c4ef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9001c4ef
Branch: refs/heads/master
Commit: 9001c4ef82a7a04d821252ac62bb7809a931c98a
Parents: d0695c0
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Sat Mar 18 20:53:00 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Thu Mar 23 10:47:55 2017 +0100
----------------------------------------------------------------------
docs/dev/libs/cep.md | 78 +-
.../flink/cep/scala/pattern/Pattern.scala | 81 +-
.../apache/flink/cep/nfa/ComputationState.java | 44 +-
.../org/apache/flink/cep/nfa/DeweyNumber.java | 17 +-
.../main/java/org/apache/flink/cep/nfa/NFA.java | 398 ++++--
.../org/apache/flink/cep/nfa/SharedBuffer.java | 25 +-
.../java/org/apache/flink/cep/nfa/State.java | 32 +-
.../apache/flink/cep/nfa/StateTransition.java | 20 +-
.../flink/cep/nfa/StateTransitionAction.java | 4 +-
.../nfa/compiler/MalformedPatternException.java | 32 -
.../flink/cep/nfa/compiler/NFACompiler.java | 317 ++++-
.../flink/cep/pattern/FilterFunctions.java | 44 +
.../cep/pattern/MalformedPatternException.java | 32 +
.../flink/cep/pattern/NotFilterFunction.java | 42 +
.../org/apache/flink/cep/pattern/Pattern.java | 115 ++
.../apache/flink/cep/pattern/Quantifier.java | 54 +
.../org/apache/flink/cep/nfa/NFAITCase.java | 1338 +++++++++++++++++-
.../java/org/apache/flink/cep/nfa/NFATest.java | 90 +-
.../flink/cep/nfa/compiler/NFACompilerTest.java | 152 +-
.../apache/flink/cep/pattern/PatternTest.java | 54 +
20 files changed, 2595 insertions(+), 374 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 8047481..22cffbc 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -341,7 +341,45 @@ patternState.subtype(SubEvent.class);
patternState.within(Time.seconds(10));
{% endhighlight %}
</td>
- </tr>
+ </tr>
+ <tr>
+ <td><strong>ZeroOrMore</strong></td>
+ <td>
+ <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p>
+ <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p>
+ {% highlight java %}
+ patternState.zeroOrMore();
+ {% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>OneOrMore</strong></td>
+ <td>
+ <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p>
+ <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p>
+ {% highlight java %}
+ patternState.oneOrMore();
+ {% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Optional</strong></td>
+ <td>
+ <p>Specifies that this pattern can occur zero or once.</p>
+ {% highlight java %}
+ patternState.optional();
+ {% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Times</strong></td>
+ <td>
+ <p>Specifies exact number of times that this pattern should be matched.</p>
+ {% highlight java %}
+ patternState.times(2);
+ {% endhighlight %}
+ </td>
+ </tr>
</tbody>
</table>
</div>
@@ -419,6 +457,44 @@ patternState.within(Time.seconds(10))
{% endhighlight %}
</td>
</tr>
+ <tr>
+ <td><strong>ZeroOrMore</strong></td>
+ <td>
+ <p>Specifies that this pattern can occur zero or more times(kleene star). This means any number of events can be matched in this state.</p>
+ <p>If eagerness is enabled(by default) for a pattern A*B and sequence A1 A2 B will generate patterns: B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.</p>
+ {% highlight scala %}
+ patternState.zeroOrMore()
+ {% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>OneOrMore</strong></td>
+ <td>
+ <p>Specifies that this pattern can occur one or more times(kleene star). This means at least one and at most infinite number of events can be matched in this state.</p>
+ <p>If eagerness is enabled (by default) for a pattern A*B and sequence A1 A2 B will generate patterns: A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.</p>
+ {% highlight scala %}
+ patternState.oneOrMore()
+ {% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Optional</strong></td>
+ <td>
+ <p>Specifies that this pattern can occur zero or once.</p>
+ {% highlight scala %}
+ patternState.optional()
+ {% endhighlight %}
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Times</strong></td>
+ <td>
+ <p>Specifies exact number of times that this pattern should be matched.</p>
+ {% highlight scala %}
+ patternState.times(2)
+ {% endhighlight %}
+ </td>
+ </tr>
</tbody>
</table>
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index cc3b03c..5baf780 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -19,7 +19,7 @@ package org.apache.flink.cep.scala.pattern
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.cep
-import org.apache.flink.cep.pattern.{Pattern => JPattern}
+import org.apache.flink.cep.pattern.{Quantifier, Pattern => JPattern}
import org.apache.flink.streaming.api.windowing.time.Time
/**
@@ -59,6 +59,12 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
/**
*
+ * @return currently applied quantifier to this pattern
+ */
+ def getQuantifier: Quantifier = jPattern.getQuantifier
+
+ /**
+ *
* @return Filter condition for an event to be matched
*/
def getFilterFunction(): Option[FilterFunction[F]] = {
@@ -160,6 +166,79 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
wrapPattern(jPattern.getPrevious())
}
+ /**
+ * Specifies that this pattern can occur zero or more times(kleene star).
+ * This means any number of events can be matched in this state.
+ *
+ * @return The same pattern with applied Kleene star operator
+ */
+ def zeroOrMore: Pattern[T, F] = {
+ jPattern.zeroOrMore()
+ this
+ }
+
+ /**
+ * Specifies that this pattern can occur zero or more times(kleene star).
+ * This means any number of events can be matched in this state.
+ *
+ * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will generate patterns:
+ * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.
+ *
+ * @param eager if true the pattern always consumes earlier events
+ * @return The same pattern with applied Kleene star operator
+ */
+ def zeroOrMore(eager: Boolean): Pattern[T, F] = {
+ jPattern.zeroOrMore(eager)
+ this
+ }
+
+ /**
+ * Specifies that this pattern can occur one or more times(kleene star).
+ * This means at least one and at most infinite number of events can be matched in this state.
+ *
+ * @return The same pattern with applied Kleene plus operator
+ */
+ def oneOrMore: Pattern[T, F] = {
+ jPattern.oneOrMore()
+ this
+ }
+
+ /**
+ * Specifies that this pattern can occur one or more times(kleene star).
+ * This means at least one and at most infinite number of events can be matched in this state.
+ *
+ * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will generate patterns:
+ * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.
+ *
+ * @param eager if true the pattern always consumes earlier events
+ * @return The same pattern with applied Kleene plus operator
+ */
+ def oneOrMore(eager: Boolean): Pattern[T, F] = {
+ jPattern.oneOrMore(eager)
+ this
+ }
+
+ /**
+ * Specifies that this pattern can occur zero or once.
+ *
+ * @return The same pattern with applied Kleene ? operator
+ */
+ def optional: Pattern[T, F] = {
+ jPattern.optional()
+ this
+ }
+
+ /**
+ * Specifies exact number of times that this pattern should be matched.
+ *
+ * @param times number of times matching event must appear
+ * @return The same pattern with number of times applied
+ */
+ def times(times: Int): Pattern[T, F] = {
+ jPattern.times(times)
+ this
+ }
+
}
object Pattern {
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 3f44fba..445d038 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -18,6 +18,8 @@
package org.apache.flink.cep.nfa;
+import org.apache.flink.util.Preconditions;
+
/**
* Helper class which encapsulates the state of the NFA computation. It points to the current state,
* the last taken event, its occurrence timestamp, the current version and the starting timestamp
@@ -41,17 +43,21 @@ public class ComputationState<T> {
// Timestamp of the first element in the pattern
private final long startTimestamp;
- public ComputationState(
- final State<T> currentState,
- final T event,
- final long timestamp,
- final DeweyNumber version,
- final long startTimestamp) {
+ private final State<T> previousState;
+
+ private ComputationState(
+ final State<T> currentState,
+ final State<T> previousState,
+ final T event,
+ final long timestamp,
+ final DeweyNumber version,
+ final long startTimestamp) {
this.state = currentState;
this.event = event;
this.timestamp = timestamp;
this.version = version;
this.startTimestamp = startTimestamp;
+ this.previousState = previousState;
}
public boolean isFinalState() {
@@ -59,7 +65,7 @@ public class ComputationState<T> {
}
public boolean isStartState() {
- return state.isStart();
+ return state.isStart() && event == null;
}
public long getTimestamp() {
@@ -74,6 +80,10 @@ public class ComputationState<T> {
return state;
}
+ public State<T> getPreviousState() {
+ return previousState;
+ }
+
public T getEvent() {
return event;
}
@@ -81,4 +91,24 @@ public class ComputationState<T> {
public DeweyNumber getVersion() {
return version;
}
+
+ public static <T> ComputationState<T> createStartState(final State<T> state) {
+ Preconditions.checkArgument(state.isStart());
+ return new ComputationState<>(state, null, null, -1L, new DeweyNumber(1), -1L);
+ }
+
+ public static <T> ComputationState<T> createStartState(final State<T> state, final DeweyNumber version) {
+ Preconditions.checkArgument(state.isStart());
+ return new ComputationState<>(state, null, null, -1L, version, -1L);
+ }
+
+ public static <T> ComputationState<T> createState(
+ final State<T> currentState,
+ final State<T> previousState,
+ final T event,
+ final long timestamp,
+ final DeweyNumber version,
+ final long startTimestamp) {
+ return new ComputationState<>(currentState, previousState, event, timestamp, version, startTimestamp);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index bb9039d..fd3fafa 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -44,6 +44,10 @@ public class DeweyNumber implements Serializable {
this.deweyNumber = deweyNumber;
}
+ public DeweyNumber(DeweyNumber number) {
+ this.deweyNumber = Arrays.copyOf(number.deweyNumber, number.deweyNumber.length);
+ }
+
/**
* Checks whether this dewey number is compatible to the other dewey number.
*
@@ -90,8 +94,19 @@ public class DeweyNumber implements Serializable {
* @return A new dewey number derived from this whose last digit is increased by one
*/
public DeweyNumber increase() {
+ return increase(1);
+ }
+
+ /**
+ * Creates a new dewey number from this such that its last digit is increased by the supplied
+ * number
+ *
+ * @param times how many times to increase the Dewey number
+ * @return A new dewey number derived from this whose last digit is increased by given number
+ */
+ public DeweyNumber increase(int times) {
int[] newDeweyNumber = Arrays.copyOf(deweyNumber, deweyNumber.length);
- newDeweyNumber[deweyNumber.length - 1]++;
+ newDeweyNumber[deweyNumber.length - 1] += times;
return new DeweyNumber(newDeweyNumber);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 257418a..3d42248 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -19,6 +19,7 @@
package org.apache.flink.cep.nfa;
import com.google.common.collect.LinkedHashMultimap;
+import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
@@ -39,7 +40,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -87,7 +87,7 @@ public class NFA<T> implements Serializable {
/**
* Buffer used to store the matched events.
*/
- private final SharedBuffer<State<T>, T> sharedBuffer;
+ private final SharedBuffer<String, T> sharedBuffer;
/**
* A set of all the valid NFA states, as returned by the
@@ -98,7 +98,7 @@ public class NFA<T> implements Serializable {
/**
* The length of a windowed pattern, as specified using the
- * {@link org.apache.flink.cep.pattern.Pattern#within(Time) Pattern.within(Time)}
+ * {@link org.apache.flink.cep.pattern.Pattern#within(Time)} Pattern.within(Time)}
* method.
*/
private final long windowTime;
@@ -109,9 +109,6 @@ public class NFA<T> implements Serializable {
*/
private final boolean handleTimeout;
- // Current starting index for the next dewey version number
- private int startEventCounter;
-
/**
* Current set of {@link ComputationState computation states} within the state machine.
* These are the "active" intermediate states that are waiting for new matching
@@ -119,8 +116,6 @@ public class NFA<T> implements Serializable {
*/
private transient Queue<ComputationState<T>> computationStates;
- private StateTransitionComparator<T> stateTransitionComparator;
-
public NFA(
final TypeSerializer<T> eventSerializer,
final long windowTime,
@@ -129,11 +124,10 @@ public class NFA<T> implements Serializable {
this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer);
this.windowTime = windowTime;
this.handleTimeout = handleTimeout;
- this.sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
- this.computationStates = new LinkedList<>();
- this.states = new HashSet<>();
- this.startEventCounter = 1;
- this.stateTransitionComparator = new StateTransitionComparator<>();
+ sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
+ computationStates = new LinkedList<>();
+
+ states = new HashSet<>();
}
public Set<State<T>> getStates() {
@@ -150,7 +144,7 @@ public class NFA<T> implements Serializable {
states.add(state);
if (state.isStart()) {
- computationStates.add(new ComputationState<>(state, null, -1L, null, -1L));
+ computationStates.add(ComputationState.createStartState(state));
}
}
@@ -201,8 +195,8 @@ public class NFA<T> implements Serializable {
}
// remove computation state which has exceeded the window length
- sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
- sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
+ sharedBuffer.release(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
+ sharedBuffer.remove(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
newComputationStates = Collections.emptyList();
} else if (event != null) {
@@ -218,8 +212,8 @@ public class NFA<T> implements Serializable {
result.addAll(matches);
// remove found patterns because they are no longer needed
- sharedBuffer.release(newComputationState.getState(), newComputationState.getEvent(), newComputationState.getTimestamp());
- sharedBuffer.remove(newComputationState.getState(), newComputationState.getEvent(), newComputationState.getTimestamp());
+ sharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
+ sharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
} else {
// add new computation state; it will be processed once the next event arrives
computationStates.add(newComputationState);
@@ -252,8 +246,7 @@ public class NFA<T> implements Serializable {
return nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
sharedBuffer.equals(other.sharedBuffer) &&
states.equals(other.states) &&
- windowTime == other.windowTime &&
- startEventCounter == other.startEventCounter;
+ windowTime == other.windowTime;
} else {
return false;
}
@@ -261,12 +254,80 @@ public class NFA<T> implements Serializable {
@Override
public int hashCode() {
- return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime, startEventCounter);
+ return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime);
+ }
+
+ private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) {
+ return s1.getName().equals(s2.getName());
}
/**
+ * Class for storing resolved transitions. It counts at insert time the number of
+ * branching transitions both for IGNORE and TAKE actions.
+ */
+ private static class OutgoingEdges<T> {
+ private List<StateTransition<T>> edges = new ArrayList<>();
+
+ private final State<T> currentState;
+
+ private int totalTakeBranches = 0;
+ private int totalIgnoreBranches = 0;
+
+ OutgoingEdges(final State<T> currentState) {
+ this.currentState = currentState;
+ }
+
+ void add(StateTransition<T> edge) {
+
+ if (!isSelfIgnore(edge)) {
+ if (edge.getAction() == StateTransitionAction.IGNORE) {
+ totalIgnoreBranches++;
+ } else if (edge.getAction() == StateTransitionAction.TAKE) {
+ totalTakeBranches++;
+ }
+ }
+
+ edges.add(edge);
+ }
+
+ int getTotalIgnoreBranches() {
+ return totalIgnoreBranches;
+ }
+ int getTotalTakeBranches() {
+ return totalTakeBranches;
+ }
+
+ List<StateTransition<T>> getEdges() {
+ return edges;
+ }
+
+ private boolean isSelfIgnore(final StateTransition<T> edge) {
+ return isEquivalentState(edge.getTargetState(), currentState) &&
+ edge.getAction() == StateTransitionAction.IGNORE;
+ }
+ }
+
+
+ /**
* Computes the next computation states based on the given computation state, the current event,
- * its timestamp and the internal state machine.
+ * its timestamp and the internal state machine. The algorithm is:
+ *
+ * 1. Decide on valid transitions and number of branching paths. See {@link OutgoingEdges}
+ * 2. Perform transitions:
+ * a) IGNORE (links in {@link SharedBuffer} will still point to the previous event)
+ * - do not perform for Start State - special case
+ * - if stays in the same state increase the current stage for future use with number of
+ * outgoing edges
+ * - if after PROCEED increase current stage and add new stage (as we change the state)
+ * - lock the entry in {@link SharedBuffer} as it is needed in the created branch
+ * b) TAKE (links in {@link SharedBuffer} will point to the current event)
+ * - add entry to the shared buffer with version of the current computation state
+ * - add stage and then increase with number of takes for the future computation states
+ * - peek to the next state if it has PROCEED path to a Final State, if true create
+ * Final ComputationState to emit results
+ * 3. Handle the Start State, as it always have to remain
+ * 4. Release the corresponding entries in {@link SharedBuffer}.
+ *
*
* @param computationState Current computation state
* @param event Current event which is processed
@@ -277,31 +338,179 @@ public class NFA<T> implements Serializable {
final ComputationState<T> computationState,
final T event,
final long timestamp) {
- Stack<State<T>> states = new Stack<>();
- List<ComputationState<T>> resultingComputationStates = new ArrayList<>();
- State<T> state = computationState.getState();
- states.push(state);
+ final OutgoingEdges<T> outgoingEdges = createDecisionGraph(computationState, event);
+
+ // Create the computing version based on the previously computed edges
+ // We need to defer the creation of computation states until we know how many edges start
+ // at this computation state so that we can assign proper version
+ final List<StateTransition<T>> edges = outgoingEdges.getEdges();
+ int takeBranchesToVisit = Math.max(0, outgoingEdges.getTotalTakeBranches() - 1);
+ int ignoreBranchesToVisit = outgoingEdges.getTotalIgnoreBranches();
+
+ final List<ComputationState<T>> resultingComputationStates = new ArrayList<>();
+ for (StateTransition<T> edge : edges) {
+ switch (edge.getAction()) {
+ case IGNORE: {
+ if (!computationState.isStartState()) {
+ final DeweyNumber version;
+ if (isEquivalentState(edge.getTargetState(), computationState.getState())) {
+ //Stay in the same state (it can be either looping one or singleton)
+ final int toIncrease = calculateIncreasingSelfState(
+ outgoingEdges.getTotalIgnoreBranches(),
+ outgoingEdges.getTotalTakeBranches());
+ version = computationState.getVersion().increase(toIncrease);
+ } else {
+ //IGNORE after PROCEED
+ version = computationState.getVersion().increase(ignoreBranchesToVisit).addStage();
+ ignoreBranchesToVisit--;
+ }
- boolean branched = false;
- while (!states.isEmpty()) {
- State<T> currentState = states.pop();
- final List<StateTransition<T>> stateTransitions = new ArrayList<>(currentState.getStateTransitions());
+ resultingComputationStates.add(
+ ComputationState.createState(
+ edge.getTargetState(),
+ computationState.getPreviousState(),
+ computationState.getEvent(),
+ computationState.getTimestamp(),
+ version,
+ computationState.getStartTimestamp()
+ )
+ );
+ sharedBuffer.lock(
+ computationState.getPreviousState().getName(),
+ computationState.getEvent(),
+ computationState.getTimestamp());
+ }
+ }
+ break;
+ case TAKE:
+ final State<T> newState = edge.getTargetState();
+ final State<T> consumingState = edge.getSourceState();
+ final State<T> previousEventState = computationState.getPreviousState();
+
+ final T previousEvent = computationState.getEvent();
+ final DeweyNumber currentVersion = computationState.getVersion();
+
+ final DeweyNumber newComputationStateVersion = new DeweyNumber(currentVersion).addStage().increase(takeBranchesToVisit);
+ takeBranchesToVisit--;
+
+ final long startTimestamp;
+ if (computationState.isStartState()) {
+ startTimestamp = timestamp;
+ sharedBuffer.put(
+ consumingState.getName(),
+ event,
+ timestamp,
+ currentVersion);
+ } else {
+ startTimestamp = computationState.getStartTimestamp();
+ sharedBuffer.put(
+ consumingState.getName(),
+ event,
+ timestamp,
+ previousEventState.getName(),
+ previousEvent,
+ computationState.getTimestamp(),
+ currentVersion);
+ }
- // this is for when we restore from legacy. In that case, the comparator is null
- // as it did not exist in the previous Flink versions, so we have to initialize it here.
+ // a new computation state is referring to the shared entry
+ sharedBuffer.lock(consumingState.getName(), event, timestamp);
+
+ resultingComputationStates.add(ComputationState.createState(
+ newState,
+ consumingState,
+ event,
+ timestamp,
+ newComputationStateVersion,
+ startTimestamp
+ ));
+
+ //check if newly created state is optional (have a PROCEED path to Final state)
+ final State<T> finalState = findFinalStateAfterProceed(newState, event);
+ if (finalState != null) {
+ sharedBuffer.lock(consumingState.getName(), event, timestamp);
+ resultingComputationStates.add(ComputationState.createState(
+ finalState,
+ consumingState,
+ event,
+ timestamp,
+ newComputationStateVersion,
+ startTimestamp));
+ }
+ break;
+ }
+ }
- if (stateTransitionComparator == null) {
- stateTransitionComparator = new StateTransitionComparator();
+ if (computationState.isStartState()) {
+ final int totalBranches = calculateIncreasingSelfState(outgoingEdges.getTotalIgnoreBranches(), outgoingEdges.getTotalTakeBranches());
+ final ComputationState<T> startState = createStartState(computationState, totalBranches);
+ resultingComputationStates.add(startState);
+ }
+
+ if (computationState.getEvent() != null) {
+ // release the shared entry referenced by the current computation state.
+ sharedBuffer.release(
+ computationState.getPreviousState().getName(),
+ computationState.getEvent(),
+ computationState.getTimestamp());
+ // try to remove unnecessary shared buffer entries
+ sharedBuffer.remove(
+ computationState.getPreviousState().getName(),
+ computationState.getEvent(),
+ computationState.getTimestamp());
+ }
+
+ return resultingComputationStates;
+ }
+
+ private State<T> findFinalStateAfterProceed(State<T> state, T event) {
+ final Stack<State<T>> statesToCheck = new Stack<>();
+ statesToCheck.push(state);
+
+ try {
+ while (!statesToCheck.isEmpty()) {
+ final State<T> currentState = statesToCheck.pop();
+ for (StateTransition<T> transition : currentState.getStateTransitions()) {
+ if (transition.getAction() == StateTransitionAction.PROCEED &&
+ checkFilterCondition(transition.getCondition(), event)) {
+ if (transition.getTargetState().isFinal()) {
+ return transition.getTargetState();
+ } else {
+ statesToCheck.push(transition.getTargetState());
+ }
+ }
+ }
}
+ } catch (Exception e) {
+ throw new RuntimeException("Failure happened in filter function.", e);
+ }
+
+ return null;
+ }
+
+ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
+ return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + 1;
+ }
+
+ private ComputationState<T> createStartState(final ComputationState<T> computationState, final int totalBranches) {
+ final DeweyNumber startVersion = computationState.getVersion().increase(totalBranches);
+ return ComputationState.createStartState(computationState.getState(), startVersion);
+ }
- // impose the IGNORE will be processed last
- Collections.sort(stateTransitions, stateTransitionComparator);
+ private OutgoingEdges<T> createDecisionGraph(ComputationState<T> computationState, T event) {
+ final Stack<State<T>> states = new Stack<>();
+ states.push(computationState.getState());
+ final OutgoingEdges<T> outgoingEdges = new OutgoingEdges<>(computationState.getState());
+ //First create all outgoing edges, so to be able to reason about the Dewey version
+ while (!states.isEmpty()) {
+ State<T> currentState = states.pop();
+ Collection<StateTransition<T>> stateTransitions = currentState.getStateTransitions();
// check all state transitions for each state
- for (StateTransition<T> stateTransition: stateTransitions) {
+ for (StateTransition<T> stateTransition : stateTransitions) {
try {
- if (stateTransition.getCondition() == null || stateTransition.getCondition().filter(event)) {
+ if (checkFilterCondition(stateTransition.getCondition(), event)) {
// filter condition is true
switch (stateTransition.getAction()) {
case PROCEED:
@@ -310,73 +519,8 @@ public class NFA<T> implements Serializable {
states.push(stateTransition.getTargetState());
break;
case IGNORE:
- final DeweyNumber version;
- if (branched) {
- version = computationState.getVersion().increase();
- } else {
- version = computationState.getVersion();
- }
- resultingComputationStates.add(new ComputationState<T>(
- computationState.getState(),
- computationState.getEvent(),
- computationState.getTimestamp(),
- version,
- computationState.getStartTimestamp()));
-
- // we have a new computation state referring to the same the shared entry
- // the lock of the current computation is released later on
- sharedBuffer.lock(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
- break;
case TAKE:
- final State<T> newState = stateTransition.getTargetState();
- final DeweyNumber oldVersion;
- final DeweyNumber newComputationStateVersion;
- final State<T> previousState = computationState.getState();
- final T previousEvent = computationState.getEvent();
- final long previousTimestamp;
- final long startTimestamp;
-
- if (computationState.isStartState()) {
- oldVersion = new DeweyNumber(startEventCounter++);
- newComputationStateVersion = oldVersion.addStage();
- startTimestamp = timestamp;
- previousTimestamp = -1L;
-
- } else {
- startTimestamp = computationState.getStartTimestamp();
- previousTimestamp = computationState.getTimestamp();
- oldVersion = computationState.getVersion();
-
- branched = true;
- newComputationStateVersion = oldVersion.addStage();
- }
-
- if (previousState.isStart()) {
- sharedBuffer.put(
- newState,
- event,
- timestamp,
- oldVersion);
- } else {
- sharedBuffer.put(
- newState,
- event,
- timestamp,
- previousState,
- previousEvent,
- previousTimestamp,
- oldVersion);
- }
-
- // a new computation state is referring to the shared entry
- sharedBuffer.lock(newState, event, timestamp);
-
- resultingComputationStates.add(new ComputationState<T>(
- newState,
- event,
- timestamp,
- newComputationStateVersion,
- startTimestamp));
+ outgoingEdges.add(stateTransition);
break;
}
}
@@ -385,19 +529,12 @@ public class NFA<T> implements Serializable {
}
}
}
+ return outgoingEdges;
+ }
- if (computationState.isStartState()) {
- // a computation state is always kept if it refers to a starting state because every
- // new element can start a new pattern
- resultingComputationStates.add(computationState);
- } else {
- // release the shared entry referenced by the current computation state.
- sharedBuffer.release(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
- // try to remove unnecessary shared buffer entries
- sharedBuffer.remove(computationState.getState(), computationState.getEvent(), computationState.getTimestamp());
- }
- return resultingComputationStates;
+ private boolean checkFilterCondition(FilterFunction<T> condition, T event) throws Exception {
+ return condition == null || condition.filter(event);
}
/**
@@ -409,8 +546,8 @@ public class NFA<T> implements Serializable {
* @return Collection of event sequences which end in the given computation state
*/
private Collection<Map<String, T>> extractPatternMatches(final ComputationState<T> computationState) {
- Collection<LinkedHashMultimap<State<T>, T>> paths = sharedBuffer.extractPatterns(
- computationState.getState(),
+ Collection<LinkedHashMultimap<String, T>> paths = sharedBuffer.extractPatterns(
+ computationState.getPreviousState().getName(),
computationState.getEvent(),
computationState.getTimestamp(),
computationState.getVersion());
@@ -420,19 +557,20 @@ public class NFA<T> implements Serializable {
TypeSerializer<T> serializer = nonDuplicatingTypeSerializer.getTypeSerializer();
// generate the correct names from the collection of LinkedHashMultimaps
- for (LinkedHashMultimap<State<T>, T> path: paths) {
+ for (LinkedHashMultimap<String, T> path: paths) {
Map<String, T> resultPath = new HashMap<>();
- for (State<T> key: path.keySet()) {
+ for (String key: path.keySet()) {
int counter = 0;
Set<T> events = path.get(key);
// we iterate over the elements in insertion order
for (T event: events) {
resultPath.put(
- events.size() > 1 ? generateStateName(key.getName(), counter): key.getName(),
+ events.size() > 1 ? generateStateName(key, counter): key,
// copy the element so that the user can change it
serializer.isImmutableType() ? event : serializer.copy(event)
);
+ counter++;
}
}
@@ -472,6 +610,7 @@ public class NFA<T> implements Serializable {
private void writeComputationState(final ComputationState<T> computationState, final ObjectOutputStream oos) throws IOException {
oos.writeObject(computationState.getState());
+ oos.writeObject(computationState.getPreviousState());
oos.writeLong(computationState.getTimestamp());
oos.writeObject(computationState.getVersion());
oos.writeLong(computationState.getStartTimestamp());
@@ -490,6 +629,7 @@ public class NFA<T> implements Serializable {
@SuppressWarnings("unchecked")
private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException {
final State<T> state = (State<T>)ois.readObject();
+ final State<T> previousState = (State<T>)ois.readObject();
final long timestamp = ois.readLong();
final DeweyNumber version = (DeweyNumber)ois.readObject();
final long startTimestamp = ois.readLong();
@@ -504,7 +644,7 @@ public class NFA<T> implements Serializable {
event = null;
}
- return new ComputationState<>(state, event, timestamp, version, startTimestamp);
+ return ComputationState.createState(state, previousState, event, timestamp, version, startTimestamp);
}
/**
@@ -629,20 +769,4 @@ public class NFA<T> implements Serializable {
return getClass().hashCode();
}
}
-
- /**
- * Comparator used for imposing the assumption that IGNORE is always the last StateTransition in a state.
- */
- private static final class StateTransitionComparator<T> implements Serializable, Comparator<StateTransition<T>> {
-
- private static final long serialVersionUID = -2775474935413622278L;
-
- @Override
- public int compare(final StateTransition<T> o1, final StateTransition<T> o2) {
- if (o1.getAction() == o2.getAction()) {
- return 0;
- }
- return o1.getAction() == StateTransitionAction.IGNORE ? 1 : -1;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index b7e288b..e6a8c75 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -212,28 +212,27 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
SharedBufferEntry<K, V> entry = get(key, value, timestamp);
if (entry != null) {
- extractionStates.add(new ExtractionState<K, V>(entry, version, new Stack<SharedBufferEntry<K, V>>()));
+ extractionStates.add(new ExtractionState<>(entry, version, new Stack<SharedBufferEntry<K, V>>()));
// use a depth first search to reconstruct the previous relations
while (!extractionStates.isEmpty()) {
- ExtractionState<K, V> extractionState = extractionStates.pop();
- DeweyNumber currentVersion = extractionState.getVersion();
+ final ExtractionState<K, V> extractionState = extractionStates.pop();
// current path of the depth first search
- Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath();
+ final Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath();
+ final SharedBufferEntry<K, V> currentEntry = extractionState.getEntry();
// termination criterion
- if (currentVersion.length() == 1) {
- LinkedHashMultimap<K, V> completePath = LinkedHashMultimap.create();
+ if (currentEntry == null) {
+ final LinkedHashMultimap<K, V> completePath = LinkedHashMultimap.create();
while(!currentPath.isEmpty()) {
- SharedBufferEntry<K, V> currentEntry = currentPath.pop();
+ final SharedBufferEntry<K, V> currentPathEntry = currentPath.pop();
- completePath.put(currentEntry.getKey(), currentEntry.getValueTime().getValue());
+ completePath.put(currentPathEntry.getKey(), currentPathEntry.getValueTime().getValue());
}
result.add(completePath);
} else {
- SharedBufferEntry<K, V> currentEntry = extractionState.getEntry();
// append state to the path
currentPath.push(currentEntry);
@@ -242,17 +241,18 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) {
// we can only proceed if the current version is compatible to the version
// of this previous relation
+ final DeweyNumber currentVersion = extractionState.getVersion();
if (currentVersion.isCompatibleWith(edge.getVersion())) {
if (firstMatch) {
// for the first match we don't have to copy the current path
- extractionStates.push(new ExtractionState<K, V>(edge.getTarget(), edge.getVersion(), currentPath));
+ extractionStates.push(new ExtractionState<>(edge.getTarget(), edge.getVersion(), currentPath));
firstMatch = false;
} else {
- Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
+ final Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
copy.addAll(currentPath);
extractionStates.push(
- new ExtractionState<K, V>(
+ new ExtractionState<>(
edge.getTarget(),
edge.getVersion(),
copy));
@@ -260,6 +260,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
}
}
}
+
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 50b2cf3..7bcb6ea 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -18,6 +18,8 @@
package org.apache.flink.cep.nfa;
+import org.apache.flink.api.common.functions.FilterFunction;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
@@ -43,7 +45,7 @@ public class State<T> implements Serializable {
this.name = name;
this.stateType = stateType;
- stateTransitions = new ArrayList<StateTransition<T>>();
+ stateTransitions = new ArrayList<>();
}
public boolean isFinal() {
@@ -60,8 +62,32 @@ public class State<T> implements Serializable {
return stateTransitions;
}
- public void addStateTransition(final StateTransition<T> stateTransition) {
- stateTransitions.add(stateTransition);
+
+ private void addStateTransition(
+ final StateTransitionAction action,
+ final State<T> targetState,
+ final FilterFunction<T> condition) {
+ stateTransitions.add(new StateTransition<T>(this, action, targetState, condition));
+ }
+
+ public void addIgnore(final FilterFunction<T> condition) {
+ addStateTransition(StateTransitionAction.IGNORE, this, condition);
+ }
+
+ public void addIgnore(final State<T> targetState,final FilterFunction<T> condition) {
+ addStateTransition(StateTransitionAction.IGNORE, targetState, condition);
+ }
+
+ public void addTake(final State<T> targetState, final FilterFunction<T> condition) {
+ addStateTransition(StateTransitionAction.TAKE, targetState, condition);
+ }
+
+ public void addProceed(final State<T> targetState, final FilterFunction<T> condition) {
+ addStateTransition(StateTransitionAction.PROCEED, targetState, condition);
+ }
+
+ public void addTake(final FilterFunction<T> condition) {
+ addStateTransition(StateTransitionAction.TAKE, this, condition);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
index 479f28a..e3c7b7a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
@@ -27,12 +27,18 @@ public class StateTransition<T> implements Serializable {
private static final long serialVersionUID = -4825345749997891838L;
private final StateTransitionAction action;
+ private final State<T> sourceState;
private final State<T> targetState;
private final FilterFunction<T> condition;
- public StateTransition(final StateTransitionAction action, final State<T> targetState, final FilterFunction<T> condition) {
+ public StateTransition(
+ final State<T> sourceState,
+ final StateTransitionAction action,
+ final State<T> targetState,
+ final FilterFunction<T> condition) {
this.action = action;
this.targetState = targetState;
+ this.sourceState = sourceState;
this.condition = condition;
}
@@ -44,6 +50,10 @@ public class StateTransition<T> implements Serializable {
return targetState;
}
+ public State<T> getSourceState() {
+ return sourceState;
+ }
+
public FilterFunction<T> getCondition() {
return condition;
}
@@ -55,6 +65,7 @@ public class StateTransition<T> implements Serializable {
StateTransition<T> other = (StateTransition<T>) obj;
return action == other.action &&
+ sourceState.getName().equals(other.sourceState.getName()) &&
targetState.getName().equals(other.targetState.getName());
} else {
return false;
@@ -64,14 +75,17 @@ public class StateTransition<T> implements Serializable {
@Override
public int hashCode() {
// we have to take the name of targetState because the transition might be reflexive
- return Objects.hash(action, targetState.getName());
+ return Objects.hash(action, targetState.getName(), sourceState.getName());
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("StateTransition(").append(action).append(", ").append(targetState.getName());
+ builder.append("StateTransition(")
+ .append(action).append(", ")
+ .append(sourceState.getName()).append(", ")
+ .append(targetState.getName());
if (condition != null) {
builder.append(", with filter)");
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
index 70fc7fb..b8ca4e8 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransitionAction.java
@@ -22,7 +22,7 @@ package org.apache.flink.cep.nfa;
* Set of actions when doing a state transition from a {@link State} to another.
*/
public enum StateTransitionAction {
- TAKE, // take the current event and assign it to the new state
- IGNORE, // ignore the current event and do the state transition
+ TAKE, // take the current event and assign it to the current state
+ IGNORE, // ignore the current event
PROCEED // do the state transition and keep the current event for further processing (epsilon transition)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
deleted file mode 100644
index a3bb5f4..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.nfa.compiler;
-
-/**
- * An exception used to indicate that a {@link org.apache.flink.cep.pattern.Pattern}
- * was not specified correctly.
- */
-public class MalformedPatternException extends RuntimeException {
-
- private static final long serialVersionUID = 7751134834983361543L;
-
- public MalformedPatternException(String message) {
- super(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 18ed21f..b476c49 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -22,18 +22,21 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.State;
-import org.apache.flink.cep.nfa.StateTransition;
-import org.apache.flink.cep.nfa.StateTransitionAction;
+import org.apache.flink.cep.pattern.FilterFunctions;
import org.apache.flink.cep.pattern.FollowedByPattern;
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.cep.pattern.NotFilterFunction;
import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map;
+import java.util.List;
import java.util.Set;
/**
@@ -42,7 +45,7 @@ import java.util.Set;
*/
public class NFACompiler {
- protected final static String BEGINNING_STATE_NAME = "$beginningState$";
+ protected static final String ENDING_STATE_NAME = "$endState$";
/**
* Compiles the given pattern into a {@link NFA}.
@@ -74,88 +77,288 @@ public class NFACompiler {
*/
@SuppressWarnings("unchecked")
public static <T> NFAFactory<T> compileFactory(
- Pattern<T, ?> pattern,
- TypeSerializer<T> inputTypeSerializer,
+ final Pattern<T, ?> pattern,
+ final TypeSerializer<T> inputTypeSerializer,
boolean timeoutHandling) {
if (pattern == null) {
// return a factory for empty NFAs
- return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling);
+ return new NFAFactoryImpl<>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling);
} else {
- // set of all generated states
- Map<String, State<T>> states = new HashMap<>();
- long windowTime;
+ final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern);
+ nfaFactoryCompiler.compileFactory();
+ return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling);
+ }
+ }
+
+ /**
+ * Converts a {@link Pattern} into graph of {@link State}. It enables sharing of
+ * compilation state across methods.
+ *
+ * @param <T>
+ */
+ private static class NFAFactoryCompiler<T> {
+
+ private final Set<String> usedNames = new HashSet<>();
+ private final List<State<T>> states = new ArrayList<>();
- // this is used to enforse pattern name uniqueness.
- Set<String> patternNames = new HashSet<>();
+ private long windowTime = 0;
+ private Pattern<T, ?> currentPattern;
- Pattern<T, ?> succeedingPattern;
- State<T> succeedingState;
- Pattern<T, ?> currentPattern = pattern;
+ NFAFactoryCompiler(final Pattern<T, ?> pattern) {
+ this.currentPattern = pattern;
+ }
+ /**
+ * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create
+ * multiple NFAs.
+ */
+ void compileFactory() {
// we're traversing the pattern from the end to the beginning --> the first state is the final state
- State<T> currentState = new State<>(currentPattern.getName(), State.StateType.Final);
- patternNames.add(currentPattern.getName());
+ State<T> sinkState = createEndingState();
+ // add all the normal states
+ sinkState = createMiddleStates(sinkState);
+ // add the beginning state
+ createStartState(sinkState);
+ }
- states.put(currentPattern.getName(), currentState);
+ List<State<T>> getStates() {
+ return states;
+ }
+
+ long getWindowTime() {
+ return windowTime;
+ }
+
+ /**
+ * Creates the dummy Final {@link State} of the NFA graph.
+ * @return dummy Final state
+ */
+ private State<T> createEndingState() {
+ State<T> endState = new State<>(ENDING_STATE_NAME, State.StateType.Final);
+ states.add(endState);
+ usedNames.add(ENDING_STATE_NAME);
windowTime = currentPattern.getWindowTime() != null ? currentPattern.getWindowTime().toMilliseconds() : 0L;
+ return endState;
+ }
- while (currentPattern.getPrevious() != null) {
- succeedingPattern = currentPattern;
- succeedingState = currentState;
- currentPattern = currentPattern.getPrevious();
+ /**
+ * Creates all the states between Start and Final state.
+ * @param sinkState the state that last state should point to (always the Final state)
+ * @return the next state after Start in the resulting graph
+ */
+ private State<T> createMiddleStates(final State<T> sinkState) {
- if (!patternNames.add(currentPattern.getName())) {
- throw new MalformedPatternException("Duplicate pattern name: " + currentPattern.getName() + ". " +
- "Pattern names must be unique.");
+ State<T> lastSink = sinkState;
+ while (currentPattern.getPrevious() != null) {
+ checkPatternNameUniqueness();
+
+ State<T> sourceState = new State<>(currentPattern.getName(), State.StateType.Normal);
+ states.add(sourceState);
+ usedNames.add(sourceState.getName());
+
+ if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
+ convertToLooping(sourceState, lastSink);
+
+ if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
+ sourceState = createFirstMandatoryStateOfLoop(sourceState, State.StateType.Normal);
+ states.add(sourceState);
+ usedNames.add(sourceState.getName());
+ }
+ } else if (currentPattern.getQuantifier() == Quantifier.TIMES) {
+ sourceState = convertToTimesState(sourceState, lastSink, currentPattern.getTimes());
+ } else {
+ convertToSingletonState(sourceState, lastSink);
}
- Time currentWindowTime = currentPattern.getWindowTime();
+ currentPattern = currentPattern.getPrevious();
+ lastSink = sourceState;
+ final Time currentWindowTime = currentPattern.getWindowTime();
if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {
// the window time is the global minimum of all window times of each state
windowTime = currentWindowTime.toMilliseconds();
}
+ }
+
+ return lastSink;
+ }
+
+ private void checkPatternNameUniqueness() {
+ if (usedNames.contains(currentPattern.getName())) {
+ throw new MalformedPatternException(
+ "Duplicate pattern name: " + currentPattern.getName() + ". " +
+ "Pattern names must be unique.");
+ }
+ }
+
+ /**
+ * Creates the Start {@link State} of the resulting NFA graph.
+ * @param sinkState the state that Start state should point to (alwyas first state of middle states)
+ * @return created state
+ */
+ @SuppressWarnings("unchecked")
+ private State<T> createStartState(State<T> sinkState) {
+ checkPatternNameUniqueness();
+
+ final State<T> beginningState;
+ if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
+ final State<T> loopingState;
+ if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.AT_LEAST_ONE)) {
+ loopingState = new State<>(currentPattern.getName(), State.StateType.Normal);
+ beginningState = createFirstMandatoryStateOfLoop(loopingState, State.StateType.Start);
+ states.add(loopingState);
+ } else {
+ loopingState = new State<>(currentPattern.getName(), State.StateType.Start);
+ beginningState = loopingState;
+ }
+ convertToLooping(loopingState, sinkState, true);
+ } else {
+ if (currentPattern.getQuantifier() == Quantifier.TIMES && currentPattern.getTimes() > 1) {
+ final State<T> timesState = new State<>(currentPattern.getName(), State.StateType.Normal);
+ states.add(timesState);
+ sinkState = convertToTimesState(timesState, sinkState, currentPattern.getTimes() - 1);
+ }
- if (states.containsKey(currentPattern.getName())) {
- currentState = states.get(currentPattern.getName());
+ beginningState = new State<>(currentPattern.getName(), State.StateType.Start);
+ convertToSingletonState(beginningState, sinkState);
+ }
+
+ states.add(beginningState);
+ usedNames.add(beginningState.getName());
+
+ return beginningState;
+ }
+
+ /**
+ * Converts the given state into a "complex" state consisting of given number of states with
+ * same {@link FilterFunction}
+ *
+ * @param sourceState the state to be converted
+ * @param sinkState the state that the converted state should point to
+ * @param times number of times the state should be copied
+ * @return the first state of the "complex" state, next state should point to it
+ */
+ private State<T> convertToTimesState(final State<T> sourceState, final State<T> sinkState, int times) {
+ convertToSingletonState(sourceState, sinkState);
+ State<T> lastSink;
+ State<T> firstState = sourceState;
+ for (int i = 0; i < times - 1; i++) {
+ lastSink = firstState;
+ firstState = new State<>(currentPattern.getName(), State.StateType.Normal);
+ states.add(firstState);
+ convertToSingletonState(firstState, lastSink);
+ }
+ return firstState;
+ }
+
+ /**
+ * Converts the given state into a simple single state. For an OPTIONAL state it also consists
+ * of a similar state without the PROCEED edge, so that for each PROCEED transition branches
+ * in computation state graph can be created only once.
+ *
+ * @param sourceState the state to be converted
+ * @param sinkState state that the state being converted should point to
+ */
+ @SuppressWarnings("unchecked")
+ private void convertToSingletonState(final State<T> sourceState, final State<T> sinkState) {
+
+ final FilterFunction<T> currentFilterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
+ final FilterFunction<T> trueFunction = FilterFunctions.trueFunction();
+ sourceState.addTake(sinkState, currentFilterFunction);
+
+ if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) {
+ sourceState.addProceed(sinkState, trueFunction);
+ }
+
+ if (currentPattern instanceof FollowedByPattern) {
+ final State<T> ignoreState;
+ if (currentPattern.getQuantifier() == Quantifier.OPTIONAL) {
+ ignoreState = new State<>(currentPattern.getName(), State.StateType.Normal);
+ ignoreState.addTake(sinkState, currentFilterFunction);
+ states.add(ignoreState);
} else {
- currentState = new State<>(currentPattern.getName(), State.StateType.Normal);
- states.put(currentState.getName(), currentState);
+ ignoreState = sourceState;
}
+ sourceState.addIgnore(ignoreState, trueFunction);
+ }
+ }
- currentState.addStateTransition(new StateTransition<T>(
- StateTransitionAction.TAKE,
- succeedingState,
- (FilterFunction<T>) succeedingPattern.getFilterFunction()));
-
- if (succeedingPattern instanceof FollowedByPattern) {
- // the followed by pattern entails a reflexive ignore transition
- currentState.addStateTransition(new StateTransition<T>(
- StateTransitionAction.IGNORE,
- currentState,
- null
- ));
+ /**
+ * Patterns with quantifiers AT_LEAST_ONE_* are converted into pair of states: a singleton state and
+ * looping state. This method creates the first of the two.
+ *
+ * @param sinkState the state the newly created state should point to, it should be a looping state
+ * @param stateType the type of the created state, as the NFA graph can also start wit AT_LEAST_ONE_*
+ * @return the newly created state
+ */
+ @SuppressWarnings("unchecked")
+ private State<T> createFirstMandatoryStateOfLoop(final State<T> sinkState, final State.StateType stateType) {
+
+ final FilterFunction<T> currentFilterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
+ final State<T> firstState = new State<>(currentPattern.getName(), stateType);
+
+ firstState.addTake(sinkState, currentFilterFunction);
+ if (currentPattern instanceof FollowedByPattern) {
+ if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) {
+ firstState.addIgnore(new NotFilterFunction<>(currentFilterFunction));
+ } else {
+ firstState.addIgnore(FilterFunctions.<T>trueFunction());
}
}
+ return firstState;
+ }
- // add the beginning state
- final State<T> beginningState;
+ /**
+ * Converts the given state into looping one. Looping state is one with TAKE edge to itself and
+ * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
+ * for each PROCEED transition branches in computation state graph can be created only once.
+ *
+ * <p>If this looping state is first of a graph we should treat the {@link Pattern} as {@link FollowedByPattern}
+ * to enable combinations.
+ *
+ * @param sourceState the state to converted
+ * @param sinkState the state that the converted state should point to
+ * @param isFirstState if the looping state is first of a graph
+ */
+ @SuppressWarnings("unchecked")
+ private void convertToLooping(final State<T> sourceState, final State<T> sinkState, boolean isFirstState) {
+
+ final FilterFunction<T> filterFunction = (FilterFunction<T>) currentPattern.getFilterFunction();
+ final FilterFunction<T> trueFunction = FilterFunctions.<T>trueFunction();
+
+ sourceState.addProceed(sinkState, trueFunction);
+ sourceState.addTake(filterFunction);
+ if (currentPattern instanceof FollowedByPattern || isFirstState) {
+ final State<T> ignoreState = new State<>(
+ currentPattern.getName(),
+ State.StateType.Normal);
+
+
+ final FilterFunction<T> ignoreCondition;
+ if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)) {
+ ignoreCondition = new NotFilterFunction<>(filterFunction);
+ } else {
+ ignoreCondition = trueFunction;
+ }
- if (states.containsKey(BEGINNING_STATE_NAME)) {
- beginningState = states.get(BEGINNING_STATE_NAME);
- } else {
- beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
- states.put(BEGINNING_STATE_NAME, beginningState);
+ sourceState.addIgnore(ignoreState, ignoreCondition);
+ ignoreState.addTake(sourceState, filterFunction);
+ ignoreState.addIgnore(ignoreState, ignoreCondition);
+ states.add(ignoreState);
}
+ }
- beginningState.addStateTransition(new StateTransition<T>(
- StateTransitionAction.TAKE,
- currentState,
- (FilterFunction<T>) currentPattern.getFilterFunction()
- ));
-
- return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
+ /**
+ * Converts the given state into looping one. Looping state is one with TAKE edge to itself and
+ * PROCEED edge to the sinkState. It also consists of a similar state without the PROCEED edge, so that
+ * for each PROCEED transition branches in computation state graph can be created only once.
+ *
+ * @param sourceState the state to converted
+ * @param sinkState the state that the converted state should point to
+ */
+ private void convertToLooping(final State<T> sourceState, final State<T> sinkState) {
+ convertToLooping(sourceState, sinkState, false);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
new file mode 100644
index 0000000..12e58ba
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FilterFunctions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+public class FilterFunctions<T> {
+
+ private FilterFunctions() {
+ }
+
+ public static <T> FilterFunction<T> trueFunction() {
+ return new FilterFunction<T>() {
+ @Override
+ public boolean filter(T value) throws Exception {
+ return true;
+ }
+ };
+ }
+
+ public static <T> FilterFunction<T> falseFunction() {
+ return new FilterFunction<T>() {
+ @Override
+ public boolean filter(T value) throws Exception {
+ return false;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
new file mode 100644
index 0000000..c85f3be
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/MalformedPatternException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+/**
+ * An exception used to indicate that a {@link org.apache.flink.cep.pattern.Pattern}
+ * was not specified correctly.
+ */
+public class MalformedPatternException extends RuntimeException {
+
+ private static final long serialVersionUID = 7751134834983361543L;
+
+ public MalformedPatternException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
new file mode 100644
index 0000000..a4fc8f5
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/NotFilterFunction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * A filter function which negates filter function.
+ *
+ * @param <T> Type of the element to filter
+ */
+public class NotFilterFunction<T> implements FilterFunction<T> {
+ private static final long serialVersionUID = -2109562093871155005L;
+
+ private final FilterFunction<T> original;
+
+ public NotFilterFunction(final FilterFunction<T> original) {
+ this.original = original;
+ }
+
+ @Override
+ public boolean filter(T value) throws Exception {
+ return !original.filter(value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 7ea675f..7b4d9c7 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.util.Preconditions;
/**
* Base class for a pattern definition.
@@ -53,6 +54,10 @@ public class Pattern<T, F extends T> {
// window length in which the pattern match has to occur
private Time windowTime;
+ private Quantifier quantifier = Quantifier.ONE;
+
+ private int times;
+
protected Pattern(final String name, final Pattern<T, ? extends T> previous) {
this.name = name;
this.previous = previous;
@@ -74,6 +79,14 @@ public class Pattern<T, F extends T> {
return windowTime;
}
+ public Quantifier getQuantifier() {
+ return quantifier;
+ }
+
+ public int getTimes() {
+ return times;
+ }
+
/**
* Specifies a filter condition which has to be fulfilled by an event in order to be matched.
*
@@ -183,4 +196,106 @@ public class Pattern<T, F extends T> {
return new Pattern<X, X>(name, null);
}
+ /**
+ * Specifies that this pattern can occur zero or more times(kleene star).
+ * This means any number of events can be matched in this state.
+ *
+ * @return The same pattern with applied Kleene star operator
+ *
+ * @throws MalformedPatternException if quantifier already applied
+ */
+ public Pattern<T, F> zeroOrMore() {
+ return zeroOrMore(true);
+ }
+
+ /**
+ * Specifies that this pattern can occur zero or more times(kleene star).
+ * This means any number of events can be matched in this state.
+ *
+ * If eagerness is enabled for a pattern A*B and sequence A1 A2 B will generate patterns:
+ * B, A1 B and A1 A2 B. If disabled B, A1 B, A2 B and A1 A2 B.
+ *
+ * @param eager if true the pattern always consumes earlier events
+ * @return The same pattern with applied Kleene star operator
+ *
+ * @throws MalformedPatternException if quantifier already applied
+ */
+ public Pattern<T, F> zeroOrMore(final boolean eager) {
+ checkIfQuantifierApplied();
+ if (eager) {
+ this.quantifier = Quantifier.ZERO_OR_MORE_EAGER;
+ } else {
+ this.quantifier = Quantifier.ZERO_OR_MORE_COMBINATIONS;
+ }
+ return this;
+ }
+
+ /**
+ * Specifies that this pattern can occur one or more times(kleene star).
+ * This means at least one and at most infinite number of events can be matched in this state.
+ *
+ * @return The same pattern with applied Kleene plus operator
+ *
+ * @throws MalformedPatternException if quantifier already applied
+ */
+ public Pattern<T, F> oneOrMore() {
+ return oneOrMore(true);
+ }
+
+ /**
+ * Specifies that this pattern can occur one or more times(kleene star).
+ * This means at least one and at most infinite number of events can be matched in this state.
+ *
+ * If eagerness is enabled for a pattern A+B and sequence A1 A2 B will generate patterns:
+ * A1 B and A1 A2 B. If disabled A1 B, A2 B and A1 A2 B.
+ *
+ * @param eager if true the pattern always consumes earlier events
+ * @return The same pattern with applied Kleene plus operator
+ *
+ * @throws MalformedPatternException if quantifier already applied
+ */
+ public Pattern<T, F> oneOrMore(final boolean eager) {
+ checkIfQuantifierApplied();
+ if (eager) {
+ this.quantifier = Quantifier.ONE_OR_MORE_EAGER;
+ } else {
+ this.quantifier = Quantifier.ONE_OR_MORE_COMBINATIONS;
+ }
+ return this;
+ }
+
+ /**
+ * Specifies that this pattern can occur zero or once.
+ *
+ * @return The same pattern with applied Kleene ? operator
+ *
+ * @throws MalformedPatternException if quantifier already applied
+ */
+ public Pattern<T, F> optional() {
+ checkIfQuantifierApplied();
+ this.quantifier = Quantifier.OPTIONAL;
+ return this;
+ }
+
+ /**
+ * Specifies exact number of times that this pattern should be matched.
+ *
+ * @param times number of times matching event must appear
+ * @return The same pattern with number of times applied
+ *
+ * @throws MalformedPatternException if quantifier already applied
+ */
+ public Pattern<T, F> times(int times) {
+ checkIfQuantifierApplied();
+ Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
+ this.quantifier = Quantifier.TIMES;
+ this.times = times;
+ return this;
+ }
+
+ private void checkIfQuantifierApplied() {
+ if (this.quantifier != Quantifier.ONE) {
+ throw new MalformedPatternException("Already applied quantifier to this Pattern. Current quantifier is: " + this.quantifier);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9001c4ef/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
new file mode 100644
index 0000000..7abe9bd
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.pattern;
+
+import java.util.EnumSet;
+
+public enum Quantifier {
+ ONE,
+ ZERO_OR_MORE_EAGER(QuantifierProperty.LOOPING, QuantifierProperty.EAGER),
+ ZERO_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING),
+ ONE_OR_MORE_EAGER(
+ QuantifierProperty.LOOPING,
+ QuantifierProperty.EAGER,
+ QuantifierProperty.AT_LEAST_ONE),
+ ONE_OR_MORE_COMBINATIONS(QuantifierProperty.LOOPING, QuantifierProperty.AT_LEAST_ONE),
+ TIMES,
+ OPTIONAL;
+
+ private final EnumSet<QuantifierProperty> properties;
+
+ Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) {
+ this.properties = EnumSet.of(first, rest);
+ }
+
+ Quantifier() {
+ this.properties = EnumSet.noneOf(QuantifierProperty.class);
+ }
+
+ public boolean hasProperty(QuantifierProperty property) {
+ return properties.contains(property);
+ }
+
+ public enum QuantifierProperty {
+ LOOPING,
+ EAGER,
+ AT_LEAST_ONE
+ }
+
+}