You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:15 UTC
[04/50] [abbrv] flink git commit: [FLINK-3318] Backward compatibility
of CEP NFA
[FLINK-3318] Backward compatibility of CEP NFA
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d20fb090
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d20fb090
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d20fb090
Branch: refs/heads/table-retraction
Commit: d20fb090c31858bc0372a8c84228d796558d56b0
Parents: 9001c4e
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Tue Mar 21 10:53:07 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Thu Mar 23 10:47:55 2017 +0100
----------------------------------------------------------------------
.../main/java/org/apache/flink/cep/nfa/NFA.java | 162 ++++++-
.../org/apache/flink/cep/nfa/SharedBuffer.java | 49 ++
.../java/org/apache/flink/cep/nfa/State.java | 19 +-
.../flink/cep/nfa/compiler/NFACompiler.java | 116 +++++
.../org/apache/flink/cep/nfa/NFAITCase.java | 194 +++++++-
.../cep/operator/CEPMigration12to13Test.java | 477 +++++++++++++++++++
.../test/resources/cep-branching-snapshot-1.2 | Bin 0 -> 6736 bytes
.../resources/cep-single-pattern-snapshot-1.2 | Bin 0 -> 3311 bytes
.../test/resources/cep-starting-snapshot-1.2 | Bin 0 -> 6526 bytes
9 files changed, 986 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 3d42248..ab03566 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -18,6 +18,8 @@
package org.apache.flink.cep.nfa;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
import com.google.common.collect.LinkedHashMultimap;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -25,18 +27,22 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.cep.NonDuplicatingTypeSerializer;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.windowing.time.Time;
+import javax.annotation.Nullable;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.io.OptionalDataException;
import java.io.Serializable;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -85,9 +91,9 @@ public class NFA<T> implements Serializable {
private final NonDuplicatingTypeSerializer<T> nonDuplicatingTypeSerializer;
/**
- * Buffer used to store the matched events.
+ * Used only for backward compatibility. Buffer used to store the matched events.
*/
- private final SharedBuffer<String, T> sharedBuffer;
+ private final SharedBuffer<State<T>, T> sharedBuffer = null;
/**
* A set of all the valid NFA states, as returned by the
@@ -110,12 +116,22 @@ public class NFA<T> implements Serializable {
private final boolean handleTimeout;
/**
+ * Used only for backward compatibility.
+ */
+ private int startEventCounter;
+
+ /**
* Current set of {@link ComputationState computation states} within the state machine.
* These are the "active" intermediate states that are waiting for new matching
* events to transition to new valid states.
*/
private transient Queue<ComputationState<T>> computationStates;
+ /**
+ * Buffer used to store the matched events.
+ */
+ private final SharedBuffer<String, T> stringSharedBuffer;
+
public NFA(
final TypeSerializer<T> eventSerializer,
final long windowTime,
@@ -124,7 +140,7 @@ public class NFA<T> implements Serializable {
this.nonDuplicatingTypeSerializer = new NonDuplicatingTypeSerializer<>(eventSerializer);
this.windowTime = windowTime;
this.handleTimeout = handleTimeout;
- sharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
+ stringSharedBuffer = new SharedBuffer<>(nonDuplicatingTypeSerializer);
computationStates = new LinkedList<>();
states = new HashSet<>();
@@ -156,7 +172,7 @@ public class NFA<T> implements Serializable {
* {@code false} otherwise.
*/
public boolean isEmpty() {
- return sharedBuffer.isEmpty();
+ return stringSharedBuffer.isEmpty();
}
/**
@@ -194,9 +210,14 @@ public class NFA<T> implements Serializable {
}
}
- // remove computation state which has exceeded the window length
- sharedBuffer.release(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
- sharedBuffer.remove(computationState.getState().getName(), computationState.getEvent(), computationState.getTimestamp());
+ stringSharedBuffer.release(
+ computationState.getPreviousState().getName(),
+ computationState.getEvent(),
+ computationState.getTimestamp());
+ stringSharedBuffer.remove(
+ computationState.getPreviousState().getName(),
+ computationState.getEvent(),
+ computationState.getTimestamp());
newComputationStates = Collections.emptyList();
} else if (event != null) {
@@ -212,8 +233,8 @@ public class NFA<T> implements Serializable {
result.addAll(matches);
// remove found patterns because they are no longer needed
- sharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
- sharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
+ stringSharedBuffer.release(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
+ stringSharedBuffer.remove(newComputationState.getPreviousState().getName(), newComputationState.getEvent(), newComputationState.getTimestamp());
} else {
// add new computation state; it will be processed once the next event arrives
computationStates.add(newComputationState);
@@ -230,7 +251,7 @@ public class NFA<T> implements Serializable {
// remove all elements which are expired
// with respect to the window length
- sharedBuffer.prune(pruningTimestamp);
+ stringSharedBuffer.prune(pruningTimestamp);
}
}
@@ -244,7 +265,7 @@ public class NFA<T> implements Serializable {
NFA<T> other = (NFA<T>) obj;
return nonDuplicatingTypeSerializer.equals(other.nonDuplicatingTypeSerializer) &&
- sharedBuffer.equals(other.sharedBuffer) &&
+ stringSharedBuffer.equals(other.stringSharedBuffer) &&
states.equals(other.states) &&
windowTime == other.windowTime;
} else {
@@ -254,7 +275,7 @@ public class NFA<T> implements Serializable {
@Override
public int hashCode() {
- return Objects.hash(nonDuplicatingTypeSerializer, sharedBuffer, states, windowTime);
+ return Objects.hash(nonDuplicatingTypeSerializer, stringSharedBuffer, states, windowTime);
}
private static <T> boolean isEquivalentState(final State<T> s1, final State<T> s2) {
@@ -376,7 +397,7 @@ public class NFA<T> implements Serializable {
computationState.getStartTimestamp()
)
);
- sharedBuffer.lock(
+ stringSharedBuffer.lock(
computationState.getPreviousState().getName(),
computationState.getEvent(),
computationState.getTimestamp());
@@ -397,14 +418,14 @@ public class NFA<T> implements Serializable {
final long startTimestamp;
if (computationState.isStartState()) {
startTimestamp = timestamp;
- sharedBuffer.put(
+ stringSharedBuffer.put(
consumingState.getName(),
event,
timestamp,
currentVersion);
} else {
startTimestamp = computationState.getStartTimestamp();
- sharedBuffer.put(
+ stringSharedBuffer.put(
consumingState.getName(),
event,
timestamp,
@@ -415,7 +436,7 @@ public class NFA<T> implements Serializable {
}
// a new computation state is referring to the shared entry
- sharedBuffer.lock(consumingState.getName(), event, timestamp);
+ stringSharedBuffer.lock(consumingState.getName(), event, timestamp);
resultingComputationStates.add(ComputationState.createState(
newState,
@@ -429,7 +450,7 @@ public class NFA<T> implements Serializable {
//check if newly created state is optional (have a PROCEED path to Final state)
final State<T> finalState = findFinalStateAfterProceed(newState, event);
if (finalState != null) {
- sharedBuffer.lock(consumingState.getName(), event, timestamp);
+ stringSharedBuffer.lock(consumingState.getName(), event, timestamp);
resultingComputationStates.add(ComputationState.createState(
finalState,
consumingState,
@@ -450,12 +471,12 @@ public class NFA<T> implements Serializable {
if (computationState.getEvent() != null) {
// release the shared entry referenced by the current computation state.
- sharedBuffer.release(
+ stringSharedBuffer.release(
computationState.getPreviousState().getName(),
computationState.getEvent(),
computationState.getTimestamp());
// try to remove unnecessary shared buffer entries
- sharedBuffer.remove(
+ stringSharedBuffer.remove(
computationState.getPreviousState().getName(),
computationState.getEvent(),
computationState.getTimestamp());
@@ -546,7 +567,7 @@ public class NFA<T> implements Serializable {
* @return Collection of event sequences which end in the given computation state
*/
private Collection<Map<String, T>> extractPatternMatches(final ComputationState<T> computationState) {
- Collection<LinkedHashMultimap<String, T>> paths = sharedBuffer.extractPatterns(
+ Collection<LinkedHashMultimap<String, T>> paths = stringSharedBuffer.extractPatterns(
computationState.getPreviousState().getName(),
computationState.getEvent(),
computationState.getTimestamp(),
@@ -592,6 +613,8 @@ public class NFA<T> implements Serializable {
nonDuplicatingTypeSerializer.clearReferences();
}
+ private final static String BEGINNING_STATE_NAME = "$beginningState$";
+
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
ois.defaultReadObject();
@@ -599,15 +622,103 @@ public class NFA<T> implements Serializable {
computationStates = new LinkedList<>();
+ final List<ComputationState<T>> readComputationStates = new ArrayList<>(numberComputationStates);
+
+ boolean afterMigration = false;
for (int i = 0; i < numberComputationStates; i++) {
ComputationState<T> computationState = readComputationState(ois);
+ if (computationState.getState().getName().equals(BEGINNING_STATE_NAME)) {
+ afterMigration = true;
+ }
- computationStates.offer(computationState);
+ readComputationStates.add(computationState);
+ }
+
+ if (afterMigration && !readComputationStates.isEmpty()) {
+ try {
+ //Backwards compatibility
+ this.computationStates.addAll(migrateNFA(readComputationStates));
+ final Field newSharedBufferField = NFA.class.getDeclaredField("stringSharedBuffer");
+ final Field sharedBufferField = NFA.class.getDeclaredField("sharedBuffer");
+ sharedBufferField.setAccessible(true);
+ newSharedBufferField.setAccessible(true);
+ newSharedBufferField.set(this, SharedBuffer.migrateSharedBuffer(this.sharedBuffer));
+ sharedBufferField.set(this, null);
+ sharedBufferField.setAccessible(false);
+ newSharedBufferField.setAccessible(false);
+ } catch (Exception e) {
+ throw new IllegalStateException("Could not migrate from earlier version", e);
+ }
+ } else {
+ this.computationStates.addAll(readComputationStates);
}
nonDuplicatingTypeSerializer.clearReferences();
}
+ /**
+ * Needed for backward compatibility. First migrates the {@link State} graph see {@link NFACompiler#migrateGraph(State)}.
+ * Than recreates the {@link ComputationState}s with the new {@link State} graph.
+ * @param readStates computation states read from snapshot
+ * @return collection of migrated computation states
+ */
+ private Collection<ComputationState<T>> migrateNFA(Collection<ComputationState<T>> readStates) {
+ final ArrayList<ComputationState<T>> computationStates = new ArrayList<>();
+
+ final State<T> startState = Iterators.find(
+ readStates.iterator(),
+ new Predicate<ComputationState<T>>() {
+ @Override
+ public boolean apply(@Nullable ComputationState<T> input) {
+ return input != null && input.getState().getName().equals(BEGINNING_STATE_NAME);
+ }
+ }).getState();
+
+ final Map<String, State<T>> convertedStates = NFACompiler.migrateGraph(startState);
+
+ for (ComputationState<T> readState : readStates) {
+ if (!readState.isStartState()) {
+ final String previousName = readState.getState().getName();
+ final String currentName = Iterators.find(
+ readState.getState().getStateTransitions().iterator(),
+ new Predicate<StateTransition<T>>() {
+ @Override
+ public boolean apply(@Nullable StateTransition<T> input) {
+ return input != null && input.getAction() == StateTransitionAction.TAKE;
+ }
+ }).getTargetState().getName();
+
+
+ final State<T> previousState = convertedStates.get(previousName);
+
+ computationStates.add(ComputationState.createState(
+ convertedStates.get(currentName),
+ previousState,
+ readState.getEvent(),
+ readState.getTimestamp(),
+ readState.getVersion(),
+ readState.getStartTimestamp()
+ ));
+ }
+ }
+
+ final String startName = Iterators.find(convertedStates.values().iterator(), new Predicate<State<T>>() {
+ @Override
+ public boolean apply(@Nullable State<T> input) {
+ return input != null && input.isStart();
+ }
+ }).getName();
+
+ computationStates.add(ComputationState.createStartState(
+ convertedStates.get(startName),
+ new DeweyNumber(this.startEventCounter)));
+
+ this.states.clear();
+ this.states.addAll(convertedStates.values());
+
+ return computationStates;
+ }
+
private void writeComputationState(final ComputationState<T> computationState, final ObjectOutputStream oos) throws IOException {
oos.writeObject(computationState.getState());
oos.writeObject(computationState.getPreviousState());
@@ -629,7 +740,13 @@ public class NFA<T> implements Serializable {
@SuppressWarnings("unchecked")
private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException {
final State<T> state = (State<T>)ois.readObject();
- final State<T> previousState = (State<T>)ois.readObject();
+ State<T> previousState;
+ try {
+ previousState = (State<T>)ois.readObject();
+ } catch (OptionalDataException e) {
+ previousState = null;
+ }
+
final long timestamp = ois.readLong();
final DeweyNumber version = (DeweyNumber)ois.readObject();
final long startTimestamp = ois.readLong();
@@ -647,6 +764,7 @@ public class NFA<T> implements Serializable {
return ComputationState.createState(state, previousState, event, timestamp, version, startTimestamp);
}
+
/**
* Generates a state name from a given name template and an index.
* <p>
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index e6a8c75..d5b7876 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa;
import com.google.common.collect.LinkedHashMultimap;
import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
@@ -463,6 +464,54 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
}
}
+ private SharedBuffer(
+ TypeSerializer<V> valueSerializer,
+ Map<K, SharedBufferPage<K, V>> pages) {
+ this.valueSerializer = valueSerializer;
+ this.pages = pages;
+ }
+
+ /**
+ * For backward compatibility only. Previously the key in {@link SharedBuffer} was {@link State}.
+ * Now it is {@link String}.
+ */
+ @Internal
+ static <T> SharedBuffer<String, T> migrateSharedBuffer(SharedBuffer<State<T>, T> buffer) {
+
+ final Map<String, SharedBufferPage<String, T>> pageMap = new HashMap<>();
+ final Map<SharedBufferEntry<State<T>, T>, SharedBufferEntry<String, T>> entries = new HashMap<>();
+
+ for (Map.Entry<State<T>, SharedBufferPage<State<T>, T>> page : buffer.pages.entrySet()) {
+ final SharedBufferPage<String, T> newPage = new SharedBufferPage<>(page.getKey().getName());
+ pageMap.put(newPage.getKey(), newPage);
+
+ for (Map.Entry<ValueTimeWrapper<T>, SharedBufferEntry<State<T>, T>> pageEntry : page.getValue().entries.entrySet()) {
+ final SharedBufferEntry<String, T> newSharedBufferEntry = new SharedBufferEntry<>(
+ pageEntry.getKey(),
+ newPage);
+ newSharedBufferEntry.referenceCounter = pageEntry.getValue().referenceCounter;
+ entries.put(pageEntry.getValue(), newSharedBufferEntry);
+ newPage.entries.put(pageEntry.getKey(), newSharedBufferEntry);
+ }
+ }
+
+ for (Map.Entry<State<T>, SharedBufferPage<State<T>, T>> page : buffer.pages.entrySet()) {
+ for (Map.Entry<ValueTimeWrapper<T>, SharedBufferEntry<State<T>, T>> pageEntry : page.getValue().entries.entrySet()) {
+ final SharedBufferEntry<String, T> newEntry = entries.get(pageEntry.getValue());
+ for (SharedBufferEdge<State<T>, T> edge : pageEntry.getValue().edges) {
+ final SharedBufferEntry<String, T> targetNewEntry = entries.get(edge.getTarget());
+
+ final SharedBufferEdge<String, T> newEdge = new SharedBufferEdge<>(
+ targetNewEntry,
+ edge.getVersion());
+ newEntry.edges.add(newEdge);
+ }
+ }
+ }
+
+ return new SharedBuffer<>(buffer.valueSerializer, pageMap);
+ }
+
private SharedBufferEntry<K, V> get(
final K key,
final V value,
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 7bcb6ea..27e0dcd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -20,9 +20,12 @@ package org.apache.flink.cep.nfa;
import org.apache.flink.api.common.functions.FilterFunction;
+import java.io.IOException;
+import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import java.util.Objects;
/**
@@ -62,7 +65,6 @@ public class State<T> implements Serializable {
return stateTransitions;
}
-
private void addStateTransition(
final StateTransitionAction action,
final State<T> targetState,
@@ -132,4 +134,19 @@ public class State<T> implements Serializable {
Final, // the state is a final state for the NFA
Normal // the state is neither a start nor a final state
}
+
+ private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
+ ois.defaultReadObject();
+
+ //Backward compatibility. Previous version of StateTransition did not have source state
+ if (!stateTransitions.isEmpty() && stateTransitions.iterator().next().getSourceState() == null) {
+ final List<StateTransition<T>> tmp = new ArrayList<>();
+ tmp.addAll(this.stateTransitions);
+
+ this.stateTransitions.clear();
+ for (StateTransition<T> transition : tmp) {
+ addStateTransition(transition.getAction(), transition.getTargetState(), transition.getCondition());
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index b476c49..8bd8612 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -18,10 +18,15 @@
package org.apache.flink.cep.nfa.compiler;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.State;
+import org.apache.flink.cep.nfa.StateTransition;
+import org.apache.flink.cep.nfa.StateTransitionAction;
import org.apache.flink.cep.pattern.FilterFunctions;
import org.apache.flink.cep.pattern.FollowedByPattern;
import org.apache.flink.cep.pattern.MalformedPatternException;
@@ -31,12 +36,15 @@ import org.apache.flink.cep.pattern.Quantifier;
import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
import org.apache.flink.streaming.api.windowing.time.Time;
+import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
/**
@@ -363,6 +371,114 @@ public class NFACompiler {
}
/**
+ * Used for migrating CEP graphs prior to 1.3. It removes the dummy start, adds the dummy end, and translates all
+ * states to consuming ones by moving all TAKEs and IGNOREs to the next state. This method assumes each state
+ * has at most one TAKE and one IGNORE and name of each state is unique. No PROCEED transition is allowed!
+ *
+ * @param oldStartState dummy start state of old graph
+ * @param <T> type of events
+ * @return map of new states, where key is the name of a state and value is the state itself
+ */
+ @Internal
+ public static <T> Map<String, State<T>> migrateGraph(State<T> oldStartState) {
+ State<T> oldFirst = oldStartState;
+ State<T> oldSecond = oldStartState.getStateTransitions().iterator().next().getTargetState();
+
+ StateTransition<T> oldFirstToSecondTake = Iterators.find(
+ oldFirst.getStateTransitions().iterator(),
+ new Predicate<StateTransition<T>>() {
+ @Override
+ public boolean apply(@Nullable StateTransition<T> input) {
+ return input != null && input.getAction() == StateTransitionAction.TAKE;
+ }
+
+ });
+
+ StateTransition<T> oldFirstIgnore = Iterators.find(
+ oldFirst.getStateTransitions().iterator(),
+ new Predicate<StateTransition<T>>() {
+ @Override
+ public boolean apply(@Nullable StateTransition<T> input) {
+ return input != null && input.getAction() == StateTransitionAction.IGNORE;
+ }
+
+ }, null);
+
+ StateTransition<T> oldSecondToThirdTake = Iterators.find(
+ oldSecond.getStateTransitions().iterator(),
+ new Predicate<StateTransition<T>>() {
+ @Override
+ public boolean apply(@Nullable StateTransition<T> input) {
+ return input != null && input.getAction() == StateTransitionAction.TAKE;
+ }
+
+ }, null);
+
+ final Map<String, State<T>> convertedStates = new HashMap<>();
+ State<T> newSecond;
+ State<T> newFirst = new State<>(oldSecond.getName(), State.StateType.Start);
+ convertedStates.put(newFirst.getName(), newFirst);
+ while (oldSecondToThirdTake != null) {
+
+ newSecond = new State<T>(oldSecondToThirdTake.getTargetState().getName(), State.StateType.Normal);
+ convertedStates.put(newSecond.getName(), newSecond);
+ newFirst.addTake(newSecond, oldFirstToSecondTake.getCondition());
+
+ if (oldFirstIgnore != null) {
+ newFirst.addIgnore(oldFirstIgnore.getCondition());
+ }
+
+ oldFirst = oldSecond;
+
+ oldFirstToSecondTake = Iterators.find(
+ oldFirst.getStateTransitions().iterator(),
+ new Predicate<StateTransition<T>>() {
+ @Override
+ public boolean apply(@Nullable StateTransition<T> input) {
+ return input != null && input.getAction() == StateTransitionAction.TAKE;
+ }
+
+ });
+
+ oldFirstIgnore = Iterators.find(
+ oldFirst.getStateTransitions().iterator(),
+ new Predicate<StateTransition<T>>() {
+ @Override
+ public boolean apply(@Nullable StateTransition<T> input) {
+ return input != null && input.getAction() == StateTransitionAction.IGNORE;
+ }
+
+ }, null);
+
+ oldSecond = oldSecondToThirdTake.getTargetState();
+
+ oldSecondToThirdTake = Iterators.find(
+ oldSecond.getStateTransitions().iterator(),
+ new Predicate<StateTransition<T>>() {
+ @Override
+ public boolean apply(@Nullable StateTransition<T> input) {
+ return input != null && input.getAction() == StateTransitionAction.TAKE;
+ }
+
+ }, null);
+
+ newFirst = newSecond;
+ }
+
+ final State<T> endingState = new State<>(ENDING_STATE_NAME, State.StateType.Final);
+
+ newFirst.addTake(endingState, oldFirstToSecondTake.getCondition());
+
+ if (oldFirstIgnore != null) {
+ newFirst.addIgnore(oldFirstIgnore.getCondition());
+ }
+
+ convertedStates.put(endingState.getName(), endingState);
+
+ return convertedStates;
+ }
+
+ /**
* Factory interface for {@link NFA}.
*
* @param <T> Type of the input events which are processed by the NFA
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 825ba957..5b05f19 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -39,6 +39,7 @@ import java.util.Map;
import java.util.Set;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class NFAITCase extends TestLogger {
@@ -421,7 +422,7 @@ public class NFAITCase extends TestLogger {
}
@Test
- public void testComplexBranchingAfterKleeneStar() {
+ public void testComplexBranchingAfterZeroOrMore() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -519,7 +520,7 @@ public class NFAITCase extends TestLogger {
}
@Test
- public void testKleeneStar() {
+ public void testZeroOrMore() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -581,7 +582,7 @@ public class NFAITCase extends TestLogger {
}
@Test
- public void testEagerKleeneStar() {
+ public void testEagerZeroOrMore() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -646,7 +647,7 @@ public class NFAITCase extends TestLogger {
@Test
- public void testBeginWithKleeneStar() {
+ public void testBeginWithZeroOrMore() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event middleEvent1 = new Event(40, "a", 2.0);
@@ -704,7 +705,7 @@ public class NFAITCase extends TestLogger {
}
@Test
- public void testKleeneStarAfterKleeneStar() {
+ public void testZeroOrMoreAfterZeroOrMore() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -779,7 +780,7 @@ public class NFAITCase extends TestLogger {
}
@Test
- public void testKleeneStarAfterBranching() {
+ public void testZeroOrMoreAfterBranching() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -865,7 +866,7 @@ public class NFAITCase extends TestLogger {
}
@Test
- public void testStrictContinuityNoResultsAfterKleeneStar() {
+ public void testStrictContinuityNoResultsAfterZeroOrMore() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event start = new Event(40, "d", 2.0);
@@ -923,7 +924,7 @@ public class NFAITCase extends TestLogger {
}
@Test
- public void testStrictContinuityResultsAfterKleeneStar() {
+ public void testStrictContinuityResultsAfterZeroOrMore() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event start = new Event(40, "d", 2.0);
@@ -1663,4 +1664,181 @@ public class NFAITCase extends TestLogger {
), resultingPatterns);
}
+ /**
+ * Clearing SharedBuffer
+ */
+
+ @Test
+ public void testTimesClearingBuffer() {
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).next("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(2).followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).within(Time.milliseconds(8));
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ nfa.process(startEvent, 1);
+ nfa.process(middleEvent1, 2);
+ nfa.process(middleEvent2, 3);
+ nfa.process(middleEvent3, 4);
+ nfa.process(end1, 6);
+
+ //pruning element
+ nfa.process(null, 10);
+
+ assertEquals(true, nfa.isEmpty());
+ }
+
+ @Test
+ public void testOptionalClearingBuffer() {
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).optional().followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).within(Time.milliseconds(8));
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ nfa.process(startEvent, 1);
+ nfa.process(middleEvent, 5);
+ nfa.process(end1, 6);
+
+ //pruning element
+ nfa.process(null, 10);
+
+ assertEquals(true, nfa.isEmpty());
+ }
+
+ @Test
+ public void testAtLeastOneClearingBuffer() {
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).oneOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).within(Time.milliseconds(8));
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ nfa.process(startEvent, 1);
+ nfa.process(middleEvent1, 3);
+ nfa.process(middleEvent2, 4);
+ nfa.process(end1, 6);
+
+ //pruning element
+ nfa.process(null, 10);
+
+ assertEquals(true, nfa.isEmpty());
+ }
+
+
+ @Test
+ public void testZeroOrMoreClearingBuffer() {
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).zeroOrMore(false).followedBy("end1").where(new FilterFunction<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).within(Time.milliseconds(8));
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ nfa.process(startEvent, 1);
+ nfa.process(middleEvent1, 3);
+ nfa.process(middleEvent2, 4);
+ nfa.process(end1, 6);
+
+ //pruning element
+ nfa.process(null, 10);
+
+ assertEquals(true, nfa.isEmpty());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
new file mode 100644
index 0000000..65fa733
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.Test;
+
+import java.io.FileInputStream;
+import java.io.ObjectInputStream;
+import java.net.URL;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CEPMigration12to13Test {
+
+ private static String getResourceFilename(String filename) {
+ ClassLoader cl = CEPMigration12to13Test.class.getClassLoader();
+ URL resource = cl.getResource(filename);
+ if (resource == null) {
+ throw new NullPointerException("Missing snapshot resource.");
+ }
+ return resource.getFile();
+ }
+
+ @Test
+ public void testMigrationAfterBranchingPattern() throws Exception {
+
+ KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+ private static final long serialVersionUID = -4873366487571254798L;
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ };
+
+ final Event startEvent = new Event(42, "start", 1.0);
+ final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
+ final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
+ final Event endEvent = new Event(42, "end", 1.0);
+
+ // uncomment these lines for regenerating the snapshot on Flink 1.2
+// OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+// new KeyedOneInputStreamOperatorTestHarness<>(
+// new KeyedCEPPatternOperator<>(
+// Event.createTypeSerializer(),
+// false,
+// keySelector,
+// IntSerializer.INSTANCE,
+// new NFAFactory(),
+// true),
+// keySelector,
+// BasicTypeInfo.INT_TYPE_INFO);
+//
+// harness.setup();
+// harness.open();
+// harness.processElement(new StreamRecord<Event>(startEvent, 1));
+// harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+// harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+// harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+// harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
+// harness.processWatermark(new Watermark(5));
+// // simulate snapshot/restore with empty element queue but NFA state
+// OperatorStateHandles snapshot = harness.snapshot(1, 1);
+// FileOutputStream out = new FileOutputStream(
+// "src/test/resources/cep-branching-snapshot-1.2");
+// ObjectOutputStream oos = new ObjectOutputStream(out);
+// oos.writeObject(snapshot.getOperatorChainIndex());
+// oos.writeObject(snapshot.getLegacyOperatorState());
+// oos.writeObject(snapshot.getManagedKeyedState());
+// oos.writeObject(snapshot.getRawKeyedState());
+// oos.writeObject(snapshot.getManagedOperatorState());
+// oos.writeObject(snapshot.getRawOperatorState());
+// out.close();
+// harness.close();
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new NFAFactory(),
+ true),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ harness.setup();
+ final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename(
+ "cep-branching-snapshot-1.2")));
+ final OperatorStateHandles snapshot = new OperatorStateHandles(
+ (int) ois.readObject(),
+ (StreamStateHandle) ois.readObject(),
+ (Collection<KeyGroupsStateHandle>) ois.readObject(),
+ (Collection<KeyGroupsStateHandle>) ois.readObject(),
+ (Collection<OperatorStateHandle>) ois.readObject(),
+ (Collection<OperatorStateHandle>) ois.readObject()
+ );
+ harness.initializeState(snapshot);
+ harness.open();
+
+ harness.processElement(new StreamRecord<>(new Event(42, "start", 1.0), 4));
+ harness.processElement(new StreamRecord<>(endEvent, 5));
+
+ harness.processWatermark(new Watermark(20));
+
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+ // watermark and 2 results
+ assertEquals(3, result.size());
+
+ Object resultObject1 = result.poll();
+ assertTrue(resultObject1 instanceof StreamRecord);
+ StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+ assertTrue(resultRecord1.getValue() instanceof Map);
+
+ Object resultObject2 = result.poll();
+ assertTrue(resultObject2 instanceof StreamRecord);
+ StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+ assertTrue(resultRecord2.getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue();
+
+ assertEquals(startEvent, patternMap1.get("start"));
+ assertEquals(middleEvent1, patternMap1.get("middle"));
+ assertEquals(endEvent, patternMap1.get("end"));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue();
+
+ assertEquals(startEvent, patternMap2.get("start"));
+ assertEquals(middleEvent2, patternMap2.get("middle"));
+ assertEquals(endEvent, patternMap2.get("end"));
+
+ harness.close();
+ }
+
+ @Test
+ public void testStartingNewPatternAfterMigration() throws Exception {
+
+ KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+ private static final long serialVersionUID = -4873366487571254798L;
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ };
+
+ final Event startEvent1 = new Event(42, "start", 1.0);
+ final SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
+ final Event startEvent2 = new Event(42, "start", 5.0);
+ final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10.0);
+ final Event endEvent = new Event(42, "end", 1.0);
+
+ // uncomment these lines for regenerating the snapshot on Flink 1.2
+// OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+// new KeyedOneInputStreamOperatorTestHarness<>(
+// new KeyedCEPPatternOperator<>(
+// Event.createTypeSerializer(),
+// false,
+// keySelector,
+// IntSerializer.INSTANCE,
+// new NFAFactory(),
+// true),
+// keySelector,
+// BasicTypeInfo.INT_TYPE_INFO);
+//
+// harness.setup();
+// harness.open();
+// harness.processElement(new StreamRecord<Event>(startEvent1, 1));
+// harness.processElement(new StreamRecord<Event>(new Event(42, "foobar", 1.0), 2));
+// harness.processElement(new StreamRecord<Event>(new SubEvent(42, "barfoo", 1.0, 5.0), 3));
+// harness.processElement(new StreamRecord<Event>(middleEvent1, 2));
+// harness.processWatermark(new Watermark(5));
+// // simulate snapshot/restore with empty element queue but NFA state
+// OperatorStateHandles snapshot = harness.snapshot(1, 1);
+// FileOutputStream out = new FileOutputStream(
+// "src/test/resources/cep-starting-snapshot-1.2");
+// ObjectOutputStream oos = new ObjectOutputStream(out);
+// oos.writeObject(snapshot.getOperatorChainIndex());
+// oos.writeObject(snapshot.getLegacyOperatorState());
+// oos.writeObject(snapshot.getManagedKeyedState());
+// oos.writeObject(snapshot.getRawKeyedState());
+// oos.writeObject(snapshot.getManagedOperatorState());
+// oos.writeObject(snapshot.getRawOperatorState());
+// out.close();
+// harness.close();
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new NFAFactory(),
+ true),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ harness.setup();
+ final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename(
+ "cep-starting-snapshot-1.2")));
+ final OperatorStateHandles snapshot = new OperatorStateHandles(
+ (int) ois.readObject(),
+ (StreamStateHandle) ois.readObject(),
+ (Collection<KeyGroupsStateHandle>) ois.readObject(),
+ (Collection<KeyGroupsStateHandle>) ois.readObject(),
+ (Collection<OperatorStateHandle>) ois.readObject(),
+ (Collection<OperatorStateHandle>) ois.readObject()
+ );
+ harness.initializeState(snapshot);
+ harness.open();
+
+ harness.processElement(new StreamRecord<>(startEvent2, 5));
+ harness.processElement(new StreamRecord<Event>(middleEvent2, 6));
+ harness.processElement(new StreamRecord<>(endEvent, 7));
+
+ harness.processWatermark(new Watermark(20));
+
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+ // watermark and 3 results
+ assertEquals(4, result.size());
+
+ Object resultObject1 = result.poll();
+ assertTrue(resultObject1 instanceof StreamRecord);
+ StreamRecord<?> resultRecord1 = (StreamRecord<?>) resultObject1;
+ assertTrue(resultRecord1.getValue() instanceof Map);
+
+ Object resultObject2 = result.poll();
+ assertTrue(resultObject2 instanceof StreamRecord);
+ StreamRecord<?> resultRecord2 = (StreamRecord<?>) resultObject2;
+ assertTrue(resultRecord2.getValue() instanceof Map);
+
+ Object resultObject3 = result.poll();
+ assertTrue(resultObject3 instanceof StreamRecord);
+ StreamRecord<?> resultRecord3 = (StreamRecord<?>) resultObject3;
+ assertTrue(resultRecord3.getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap1 = (Map<String, Event>) resultRecord1.getValue();
+
+ assertEquals(startEvent1, patternMap1.get("start"));
+ assertEquals(middleEvent1, patternMap1.get("middle"));
+ assertEquals(endEvent, patternMap1.get("end"));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap2 = (Map<String, Event>) resultRecord2.getValue();
+
+ assertEquals(startEvent1, patternMap2.get("start"));
+ assertEquals(middleEvent2, patternMap2.get("middle"));
+ assertEquals(endEvent, patternMap2.get("end"));
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap3 = (Map<String, Event>) resultRecord3.getValue();
+
+ assertEquals(startEvent2, patternMap3.get("start"));
+ assertEquals(middleEvent2, patternMap3.get("middle"));
+ assertEquals(endEvent, patternMap3.get("end"));
+
+ harness.close();
+ }
+
+ @Test
+ public void testSinglePatternAfterMigration() throws Exception {
+
+ KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
+ private static final long serialVersionUID = -4873366487571254798L;
+
+ @Override
+ public Integer getKey(Event value) throws Exception {
+ return value.getId();
+ }
+ };
+
+ final Event startEvent1 = new Event(42, "start", 1.0);
+
+ // uncomment these lines for regenerating the snapshot on Flink 1.2
+// OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+// new KeyedOneInputStreamOperatorTestHarness<>(
+// new KeyedCEPPatternOperator<>(
+// Event.createTypeSerializer(),
+// false,
+// keySelector,
+// IntSerializer.INSTANCE,
+// new SinglePatternNFAFactory(),
+// true),
+// keySelector,
+// BasicTypeInfo.INT_TYPE_INFO);
+//
+// harness.setup();
+// harness.open();
+// harness.processWatermark(new Watermark(5));
+// // simulate snapshot/restore with empty element queue but NFA state
+// OperatorStateHandles snapshot = harness.snapshot(1, 1);
+// FileOutputStream out = new FileOutputStream(
+// "src/test/resources/cep-single-pattern-snapshot-1.2");
+// ObjectOutputStream oos = new ObjectOutputStream(out);
+// oos.writeObject(snapshot.getOperatorChainIndex());
+// oos.writeObject(snapshot.getLegacyOperatorState());
+// oos.writeObject(snapshot.getManagedKeyedState());
+// oos.writeObject(snapshot.getRawKeyedState());
+// oos.writeObject(snapshot.getManagedOperatorState());
+// oos.writeObject(snapshot.getRawOperatorState());
+// out.close();
+// harness.close();
+
+ OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ new KeyedCEPPatternOperator<>(
+ Event.createTypeSerializer(),
+ false,
+ keySelector,
+ IntSerializer.INSTANCE,
+ new SinglePatternNFAFactory(),
+ true),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO);
+
+ harness.setup();
+ final ObjectInputStream ois = new ObjectInputStream(new FileInputStream(getResourceFilename(
+ "cep-single-pattern-snapshot-1.2")));
+ final OperatorStateHandles snapshot = new OperatorStateHandles(
+ (int) ois.readObject(),
+ (StreamStateHandle) ois.readObject(),
+ (Collection<KeyGroupsStateHandle>) ois.readObject(),
+ (Collection<KeyGroupsStateHandle>) ois.readObject(),
+ (Collection<OperatorStateHandle>) ois.readObject(),
+ (Collection<OperatorStateHandle>) ois.readObject()
+ );
+ harness.initializeState(snapshot);
+ harness.open();
+
+ harness.processElement(new StreamRecord<>(startEvent1, 5));
+
+ harness.processWatermark(new Watermark(20));
+
+ ConcurrentLinkedQueue<Object> result = harness.getOutput();
+
+ // watermark and the result
+ assertEquals(2, result.size());
+
+ Object resultObject = result.poll();
+ assertTrue(resultObject instanceof StreamRecord);
+ StreamRecord<?> resultRecord = (StreamRecord<?>) resultObject;
+ assertTrue(resultRecord.getValue() instanceof Map);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Event> patternMap = (Map<String, Event>) resultRecord.getValue();
+
+ assertEquals(startEvent1, patternMap.get("start"));
+
+ harness.close();
+ }
+
+ private static class SinglePatternNFAFactory implements NFACompiler.NFAFactory<Event> {
+
+ private static final long serialVersionUID = 1173020762472766713L;
+
+ private final boolean handleTimeout;
+
+ private SinglePatternNFAFactory() {
+ this(false);
+ }
+
+ private SinglePatternNFAFactory(boolean handleTimeout) {
+ this.handleTimeout = handleTimeout;
+ }
+
+ @Override
+ public NFA<Event> createNFA() {
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
+ .within(Time.milliseconds(10L));
+
+ return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+ }
+ }
+
+ private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
+
+ private static final long serialVersionUID = 1173020762472766713L;
+
+ private final boolean handleTimeout;
+
+ private NFAFactory() {
+ this(false);
+ }
+
+ private NFAFactory(boolean handleTimeout) {
+ this.handleTimeout = handleTimeout;
+ }
+
+ @Override
+ public NFA<Event> createNFA() {
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
+ .followedBy("middle")
+ .subtype(SubEvent.class)
+ .where(new MiddleFilter())
+ .followedBy("end")
+ .where(new EndFilter())
+ // add a window timeout to test whether timestamps of elements in the
+ // priority queue in CEP operator are correctly checkpointed/restored
+ .within(Time.milliseconds(10L));
+
+ return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+ }
+ }
+
+ private static class StartFilter implements FilterFunction<Event> {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("start");
+ }
+ }
+
+ private static class MiddleFilter implements FilterFunction<SubEvent> {
+ private static final long serialVersionUID = 6215754202506583964L;
+
+ @Override
+ public boolean filter(SubEvent value) throws Exception {
+ return value.getVolume() > 5.0;
+ }
+ }
+
+ private static class EndFilter implements FilterFunction<Event> {
+ private static final long serialVersionUID = 7056763917392056548L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("end");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2
new file mode 100644
index 0000000..47f710e
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-branching-snapshot-1.2 differ
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2
new file mode 100644
index 0000000..255f46a
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-single-pattern-snapshot-1.2 differ
http://git-wip-us.apache.org/repos/asf/flink/blob/d20fb090/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 b/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2
new file mode 100644
index 0000000..c41f6c2
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-starting-snapshot-1.2 differ