You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/06/13 14:54:19 UTC
[02/10] flink git commit: [FLINK-8725] Separate state from NFA in CEP
library
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
index 37ad006..3879699 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
@@ -36,8 +36,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
- * Tests if the {@link NFA} status ({@link NFA#computationStates} or {@link NFA#eventSharedBuffer})
- * is changed after processing events.
+ * Tests if the {@link NFAState} status is changed after processing events.
*/
public class NFAStatusChangeITCase {
@@ -76,45 +75,47 @@ public class NFAStatusChangeITCase {
NFACompiler.NFAFactory<Event> nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
NFA<Event> nfa = nfaFactory.createNFA();
- nfa.process(new Event(1, "b", 1.0), 1L);
- assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfa.isNFAChanged());
+ NFAState<Event> nfaState = nfa.createNFAState();
- nfa.resetNFAChanged();
- nfa.process(new Event(2, "a", 1.0), 2L);
- assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfa.isNFAChanged());
+ nfa.process(nfaState, new Event(1, "b", 1.0), 1L);
+ assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfaState.isStateChanged());
+
+ nfaState.resetStateChanged();
+ nfa.process(nfaState, new Event(2, "a", 1.0), 2L);
+ assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfaState.isStateChanged());
// the status of the queue of ComputationStatus changed,
// more than one ComputationStatus is generated by the event from some ComputationStatus
- nfa.resetNFAChanged();
- nfa.process(new Event(3, "f", 1.0), 3L);
- assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfa.isNFAChanged());
+ nfaState.resetStateChanged();
+ nfa.process(nfaState, new Event(3, "f", 1.0), 3L);
+ assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have not changed
- nfa.resetNFAChanged();
- nfa.process(new Event(4, "f", 1.0), 4L);
- assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfa.isNFAChanged());
+ nfaState.resetStateChanged();
+ nfa.process(nfaState, new Event(4, "f", 1.0), 4L);
+ assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have changed
- nfa.resetNFAChanged();
- nfa.process(new Event(5, "b", 1.0), 5L);
- assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfa.isNFAChanged());
+ nfaState.resetStateChanged();
+ nfa.process(nfaState, new Event(5, "b", 1.0), 5L);
+ assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have changed
- nfa.resetNFAChanged();
- nfa.process(new Event(6, "d", 1.0), 6L);
- assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfa.isNFAChanged());
+ nfaState.resetStateChanged();
+ nfa.process(nfaState, new Event(6, "d", 1.0), 6L);
+ assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have not changed
// as the timestamp is within the window
- nfa.resetNFAChanged();
- nfa.process(null, 8L);
- assertFalse("NFA status should not change as the timestamp is within the window", nfa.isNFAChanged());
+ nfaState.resetStateChanged();
+ nfa.process(nfaState, null, 8L);
+ assertFalse("NFA status should not change as the timestamp is within the window", nfaState.isStateChanged());
// timeout ComputationStatus will be removed from the queue of ComputationStatus and timeout event will
// be removed from eventSharedBuffer as the timeout happens
- nfa.resetNFAChanged();
- Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutResults = nfa.process(null, 12L).f1;
- assertTrue("NFA status should change as timeout happens", nfa.isNFAChanged() && !timeoutResults.isEmpty());
+ nfaState.resetStateChanged();
+ Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutResults = nfa.process(nfaState, null, 12L).f1;
+ assertTrue("NFA status should change as timeout happens", nfaState.isStateChanged() && !timeoutResults.isEmpty());
}
@Test
@@ -143,12 +144,14 @@ public class NFAStatusChangeITCase {
NFACompiler.NFAFactory<Event> nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
NFA<Event> nfa = nfaFactory.createNFA();
- nfa.resetNFAChanged();
- nfa.process(new Event(6, "start", 1.0), 6L);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ nfaState.resetStateChanged();
+ nfa.process(nfaState, new Event(6, "start", 1.0), 6L);
- nfa.resetNFAChanged();
- nfa.process(new Event(6, "a", 1.0), 7L);
- assertTrue(nfa.isNFAChanged());
+ nfaState.resetStateChanged();
+ nfa.process(nfaState, new Event(6, "a", 1.0), 7L);
+ assertTrue(nfaState.isStateChanged());
}
@Test
@@ -170,11 +173,13 @@ public class NFAStatusChangeITCase {
NFACompiler.NFAFactory<Event> nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
NFA<Event> nfa = nfaFactory.createNFA();
- nfa.resetNFAChanged();
- nfa.process(new Event(6, "start", 1.0), 6L);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ nfaState.resetStateChanged();
+ nfa.process(nfaState, new Event(6, "start", 1.0), 6L);
- nfa.resetNFAChanged();
- nfa.process(new Event(6, "end", 1.0), 17L);
- assertTrue(nfa.isNFAChanged());
+ nfaState.resetStateChanged();
+ nfa.process(nfaState, new Event(6, "end", 1.0), 17L);
+ assertTrue(nfaState.isStateChanged());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 7721653..5d43111 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -52,7 +52,6 @@ import static org.junit.Assert.assertEquals;
public class NFATest extends TestLogger {
@Test
public void testSimpleNFA() {
- NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0, false);
List<StreamRecord<Event>> streamEvents = new ArrayList<>();
streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
@@ -86,9 +85,12 @@ public class NFATest extends TestLogger {
});
endState.addIgnore(BooleanConditions.<Event>trueFunction());
- nfa.addState(startState);
- nfa.addState(endState);
- nfa.addState(endingState);
+ List<State<Event>> states = new ArrayList<>();
+ states.add(startState);
+ states.add(endState);
+ states.add(endingState);
+
+ NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0, false, states);
Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
@@ -103,7 +105,7 @@ public class NFATest extends TestLogger {
expectedPatterns.add(firstPattern);
expectedPatterns.add(secondPattern);
- Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents);
+ Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createNFAState(), streamEvents);
assertEquals(expectedPatterns, actualPatterns);
}
@@ -126,7 +128,7 @@ public class NFATest extends TestLogger {
expectedPatterns.add(secondPattern);
- Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents);
+ Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createNFAState(), streamEvents);
assertEquals(expectedPatterns, actualPatterns);
}
@@ -145,7 +147,7 @@ public class NFATest extends TestLogger {
Set<Map<String, List<Event>>> expectedPatterns = Collections.emptySet();
- Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents);
+ Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createNFAState(), streamEvents);
assertEquals(expectedPatterns, actualPatterns);
}
@@ -172,7 +174,7 @@ public class NFATest extends TestLogger {
expectedPatterns.add(secondPattern);
- Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents);
+ Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createNFAState(), streamEvents);
assertEquals(expectedPatterns, actualPatterns);
}
@@ -187,21 +189,25 @@ public class NFATest extends TestLogger {
streamEvents.add(new StreamRecord<>(new Event(3, "loop", 3.0), 103L));
streamEvents.add(new StreamRecord<>(new Event(4, "loop", 4.0), 104L));
streamEvents.add(new StreamRecord<>(new Event(5, "loop", 5.0), 105L));
- runNFA(nfa, streamEvents);
- NFA.NFASerializer<Event> serializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
+ NFAState<Event> nfaState = nfa.createNFAState();
+ runNFA(nfa, nfaState, streamEvents);
+
+ NFAStateSerializer<Event> serializer = new NFAStateSerializer<>(Event.createTypeSerializer());
//serialize
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- serializer.serialize(nfa, new DataOutputViewStreamWrapper(baos));
+ serializer.serialize(nfaState, new DataOutputViewStreamWrapper(baos));
baos.close();
}
- public <T> Collection<Map<String, List<T>>> runNFA(NFA<T> nfa, List<StreamRecord<T>> inputs) {
+ public <T> Collection<Map<String, List<T>>> runNFA(
+ NFA<T> nfa, NFAState<T> nfaState, List<StreamRecord<T>> inputs) {
Set<Map<String, List<T>>> actualPatterns = new HashSet<>();
for (StreamRecord<T> streamEvent : inputs) {
Collection<Map<String, List<T>>> matchedPatterns = nfa.process(
+ nfaState,
streamEvent.getValue(),
streamEvent.getTimestamp()).f0;
@@ -311,24 +317,26 @@ public class NFATest extends TestLogger {
Event b3 = new Event(41, "b", 5.0);
Event d = new Event(43, "d", 4.0);
- nfa.process(a, 1);
- nfa.process(b, 2);
- nfa.process(c, 3);
- nfa.process(b1, 4);
- nfa.process(b2, 5);
- nfa.process(b3, 6);
- nfa.process(d, 7);
- nfa.process(a, 8);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ nfa.process(nfaState, a, 1);
+ nfa.process(nfaState, b, 2);
+ nfa.process(nfaState, c, 3);
+ nfa.process(nfaState, b1, 4);
+ nfa.process(nfaState, b2, 5);
+ nfa.process(nfaState, b3, 6);
+ nfa.process(nfaState, d, 7);
+ nfa.process(nfaState, a, 8);
- NFA.NFASerializer<Event> serializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
+ NFAStateSerializer<Event> serializer = new NFAStateSerializer<>(Event.createTypeSerializer());
//serialize
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- serializer.serialize(nfa, new DataOutputViewStreamWrapper(baos));
+ serializer.serialize(nfaState, new DataOutputViewStreamWrapper(baos));
baos.close();
// copy
- NFA.NFASerializer<Event> copySerializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
+ NFAStateSerializer<Event> copySerializer = new NFAStateSerializer<>(Event.createTypeSerializer());
ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
ByteArrayOutputStream out = new ByteArrayOutputStream();
copySerializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
@@ -337,15 +345,14 @@ public class NFATest extends TestLogger {
// deserialize
ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
- NFA<Event> copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
+ NFAState<Event> copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
bais.close();
- assertEquals(nfa, copy);
+ assertEquals(nfaState, copy);
}
}
private NFA<Event> createStartEndNFA(long windowLength) {
- NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), windowLength, false);
State<Event> startState = new State<>("start", State.StateType.Start);
State<Event> endState = new State<>("end", State.StateType.Normal);
@@ -373,11 +380,12 @@ public class NFATest extends TestLogger {
});
endState.addIgnore(BooleanConditions.<Event>trueFunction());
- nfa.addState(startState);
- nfa.addState(endState);
- nfa.addState(endingState);
+ List<State<Event>> states = new ArrayList<>();
+ states.add(startState);
+ states.add(endState);
+ states.add(endingState);
- return nfa;
+ return new NFA<>(Event.createTypeSerializer(), windowLength, false, states);
}
private NFA<Event> createLoopingNFA(long windowLength) {
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
index a9e1795..b5312cb 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
@@ -35,16 +35,36 @@ import java.util.Map;
*/
public class NFATestUtilities {
- public static List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa) {
- return feedNFA(inputEvents, nfa, AfterMatchSkipStrategy.noSkip());
+ public static List<List<Event>> feedNFA(
+ List<StreamRecord<Event>> inputEvents,
+ NFA<Event> nfa) {
+ return feedNFA(inputEvents, nfa, nfa.createNFAState(), AfterMatchSkipStrategy.noSkip());
}
- public static List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa,
- AfterMatchSkipStrategy afterMatchSkipStrategy) {
+ public static List<List<Event>> feedNFA(
+ List<StreamRecord<Event>> inputEvents,
+ NFA<Event> nfa,
+ NFAState<Event> nfaState) {
+ return feedNFA(inputEvents, nfa, nfaState, AfterMatchSkipStrategy.noSkip());
+ }
+
+ public static List<List<Event>> feedNFA(
+ List<StreamRecord<Event>> inputEvents,
+ NFA<Event> nfa,
+ AfterMatchSkipStrategy afterMatchSkipStrategy) {
+ return feedNFA(inputEvents, nfa, nfa.createNFAState(), afterMatchSkipStrategy);
+ }
+
+ public static List<List<Event>> feedNFA(
+ List<StreamRecord<Event>> inputEvents,
+ NFA<Event> nfa,
+ NFAState<Event> nfaState,
+ AfterMatchSkipStrategy afterMatchSkipStrategy) {
List<List<Event>> resultingPatterns = new ArrayList<>();
for (StreamRecord<Event> inputEvent : inputEvents) {
Collection<Map<String, List<Event>>> patterns = nfa.process(
+ nfaState,
inputEvent.getValue(),
inputEvent.getTimestamp(),
afterMatchSkipStrategy).f0;
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
index 357107f..c94d739 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
@@ -140,11 +140,13 @@ public void testClearingBuffer() throws Exception {
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
- List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(a1, b1, c1, d)
));
- assertTrue(nfa.isEmpty());
+ assertTrue(nfaState.isEmpty());
}
@Test
@@ -182,13 +184,15 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
- List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(a1, d1, d2, d3),
Lists.newArrayList(a1, d1, d2),
Lists.newArrayList(a1, d1)
));
- assertTrue(nfa.isEmpty());
+ assertTrue(nfaState.isEmpty());
}
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
index f88e5b2..b603174 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
@@ -91,13 +91,15 @@ public class UntilConditionITCase {
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
Lists.newArrayList(startEvent, middleEvent1, breaking)
));
- assertTrue(nfa.isEmpty());
+ assertTrue(nfaState.isEmpty());
}
@Test
@@ -137,7 +139,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, breaking),
@@ -145,7 +149,7 @@ public class UntilConditionITCase {
Lists.newArrayList(startEvent, middleEvent1, middleEvent3, breaking),
Lists.newArrayList(startEvent, middleEvent1, breaking)
));
- assertTrue(nfa.isEmpty());
+ assertTrue(nfaState.isEmpty());
}
@Test
@@ -185,13 +189,15 @@ public class UntilConditionITCase {
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
Lists.newArrayList(startEvent, middleEvent1, breaking)
));
- assertTrue(nfa.isEmpty());
+ assertTrue(nfaState.isEmpty());
}
@Test
@@ -233,12 +239,14 @@ public class UntilConditionITCase {
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, breaking)
));
- assertTrue(nfa.isEmpty());
+ assertTrue(nfaState.isEmpty());
}
@Test
@@ -278,14 +286,16 @@ public class UntilConditionITCase {
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
Lists.newArrayList(startEvent, middleEvent1, breaking),
Lists.newArrayList(startEvent, breaking)
));
- assertTrue(nfa.isEmpty());
+ assertTrue(nfaState.isEmpty());
}
@Test
@@ -325,7 +335,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, breaking),
@@ -334,7 +346,7 @@ public class UntilConditionITCase {
Lists.newArrayList(startEvent, middleEvent1, breaking),
Lists.newArrayList(startEvent, breaking)
));
- assertTrue(nfa.isEmpty());
+ assertTrue(nfaState.isEmpty());
}
@Test
@@ -374,14 +386,16 @@ public class UntilConditionITCase {
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
Lists.newArrayList(startEvent, middleEvent1, breaking),
Lists.newArrayList(startEvent, breaking)
));
- assertTrue(nfa.isEmpty());
+ assertTrue(nfaState.isEmpty());
}
@Test
@@ -502,7 +516,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
@@ -510,7 +526,7 @@ public class UntilConditionITCase {
Lists.newArrayList(startEvent, middleEvent1)
));
- assertTrue(nfa.isEmpty());
+ assertTrue(nfaState.isEmpty());
}
@Test
@@ -553,7 +569,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
@@ -561,7 +579,7 @@ public class UntilConditionITCase {
Lists.newArrayList(startEvent, middleEvent1)
));
- assertTrue(nfa.isEmpty());
+ assertTrue(nfaState.isEmpty());
}
@Test
@@ -604,7 +622,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+ NFAState<Event> nfaState = nfa.createNFAState();
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
@@ -613,6 +633,6 @@ public class UntilConditionITCase {
Lists.newArrayList(startEvent)
));
- assertTrue(nfa.isEmpty());
+ assertTrue(nfaState.isEmpty());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index ec2cf47..d1b6d59 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -40,6 +40,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -132,7 +133,7 @@ public class NFACompilerTest extends TestLogger {
NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false);
- Set<State<Event>> states = nfa.getStates();
+ Collection<State<Event>> states = nfa.getStates();
assertEquals(4, states.size());
Map<String, State<Event>> stateMap = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
index 2fda47f..3151498 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
@@ -71,7 +71,7 @@ public class CEPMigrationTest {
@Parameterized.Parameters(name = "Migration Savepoint: {0}")
public static Collection<MigrationVersion> parameters () {
- return Arrays.asList(MigrationVersion.v1_3, MigrationVersion.v1_4);
+ return Arrays.asList(MigrationVersion.v1_5);
}
public CEPMigrationTest(MigrationVersion migrateVersion) {
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 98cb468..5a98445 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -462,9 +462,9 @@ public class CEPOperatorTest extends TestLogger {
try {
harness.open();
- final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "nfaOperatorState");
+ final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "nfaValueState");
final ValueState nfaOperatorStateSpy = Mockito.spy(nfaOperatorState);
- Whitebox.setInternalState(operator, "nfaOperatorState", nfaOperatorStateSpy);
+ Whitebox.setInternalState(operator, "nfaValueState", nfaOperatorStateSpy);
Event startEvent = new Event(42, "c", 1.0);
SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
@@ -507,9 +507,9 @@ public class CEPOperatorTest extends TestLogger {
harness.open();
- final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "nfaOperatorState");
+ final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "nfaValueState");
final ValueState nfaOperatorStateSpy = Mockito.spy(nfaOperatorState);
- Whitebox.setInternalState(operator, "nfaOperatorState", nfaOperatorStateSpy);
+ Whitebox.setInternalState(operator, "nfaValueState", nfaOperatorStateSpy);
Event startEvent = new Event(42, "c", 1.0);
SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
@@ -568,8 +568,8 @@ public class CEPOperatorTest extends TestLogger {
assertEquals(2L, harness.numEventTimeTimers());
assertEquals(4L, operator.getPQSize(42));
assertEquals(1L, operator.getPQSize(43));
- assertTrue(!operator.hasNonEmptyNFA(42));
- assertTrue(!operator.hasNonEmptyNFA(43));
+ assertTrue(!operator.hasNonEmptyNFAState(42));
+ assertTrue(!operator.hasNonEmptyNFAState(43));
harness.processWatermark(new Watermark(2L));
@@ -581,9 +581,9 @@ public class CEPOperatorTest extends TestLogger {
// for 43 the element entered the NFA and the PQ is empty
assertEquals(2L, harness.numEventTimeTimers());
- assertTrue(operator.hasNonEmptyNFA(42));
+ assertTrue(operator.hasNonEmptyNFAState(42));
assertEquals(1L, operator.getPQSize(42));
- assertTrue(operator.hasNonEmptyNFA(43));
+ assertTrue(operator.hasNonEmptyNFAState(43));
assertTrue(!operator.hasNonEmptyPQ(43));
harness.processElement(new StreamRecord<>(startEvent2, 4L));
@@ -605,9 +605,9 @@ public class CEPOperatorTest extends TestLogger {
// now we have 1 key because the 43 expired and was removed.
// 42 is still there due to startEvent2
assertEquals(1L, harness.numEventTimeTimers());
- assertTrue(operator2.hasNonEmptyNFA(42));
+ assertTrue(operator2.hasNonEmptyNFAState(42));
assertTrue(!operator2.hasNonEmptyPQ(42));
- assertTrue(!operator2.hasNonEmptyNFA(43));
+ assertTrue(!operator2.hasNonEmptyNFAState(43));
assertTrue(!operator2.hasNonEmptyPQ(43));
verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
@@ -622,7 +622,7 @@ public class CEPOperatorTest extends TestLogger {
harness.processWatermark(20L);
harness.processWatermark(21L);
- assertTrue(!operator2.hasNonEmptyNFA(42));
+ assertTrue(!operator2.hasNonEmptyNFAState(42));
assertTrue(!operator2.hasNonEmptyPQ(42));
assertEquals(0L, harness.numEventTimeTimers());
@@ -665,7 +665,7 @@ public class CEPOperatorTest extends TestLogger {
assertEquals(1L, harness.numEventTimeTimers());
assertEquals(7L, operator.getPQSize(41));
- assertTrue(!operator.hasNonEmptyNFA(41));
+ assertTrue(!operator.hasNonEmptyNFAState(41));
harness.processWatermark(new Watermark(2L));
@@ -674,7 +674,7 @@ public class CEPOperatorTest extends TestLogger {
assertEquals(1L, harness.numEventTimeTimers());
assertEquals(6L, operator.getPQSize(41));
- assertTrue(operator.hasNonEmptyNFA(41)); // processed the first element
+ assertTrue(operator.hasNonEmptyNFAState(41)); // processed the first element
harness.processWatermark(new Watermark(8L));
@@ -714,12 +714,12 @@ public class CEPOperatorTest extends TestLogger {
assertEquals(1L, harness.numEventTimeTimers());
assertEquals(0L, operator.getPQSize(41));
- assertTrue(operator.hasNonEmptyNFA(41));
+ assertTrue(operator.hasNonEmptyNFAState(41));
harness.processWatermark(new Watermark(17L));
verifyWatermark(harness.getOutput().poll(), 17L);
- assertTrue(!operator.hasNonEmptyNFA(41));
+ assertTrue(!operator.hasNonEmptyNFAState(41));
assertTrue(!operator.hasNonEmptyPQ(41));
assertEquals(0L, harness.numEventTimeTimers());
} finally {
@@ -800,8 +800,8 @@ public class CEPOperatorTest extends TestLogger {
assertTrue(!operator.hasNonEmptyPQ(42));
assertTrue(!operator.hasNonEmptyPQ(43));
- assertTrue(operator.hasNonEmptyNFA(42));
- assertTrue(operator.hasNonEmptyNFA(43));
+ assertTrue(operator.hasNonEmptyNFAState(42));
+ assertTrue(operator.hasNonEmptyNFAState(43));
harness.setProcessingTime(3L);
@@ -834,10 +834,10 @@ public class CEPOperatorTest extends TestLogger {
harness.setProcessingTime(21L);
- assertTrue(operator2.hasNonEmptyNFA(42));
+ assertTrue(operator2.hasNonEmptyNFAState(42));
harness.processElement(new StreamRecord<>(startEvent1, 21L));
- assertTrue(operator2.hasNonEmptyNFA(42));
+ assertTrue(operator2.hasNonEmptyNFAState(42));
harness.setProcessingTime(49L);
@@ -845,7 +845,7 @@ public class CEPOperatorTest extends TestLogger {
harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
// the pattern expired
- assertTrue(!operator2.hasNonEmptyNFA(42));
+ assertTrue(!operator2.hasNonEmptyNFAState(42));
assertEquals(0L, harness.numEventTimeTimers());
assertTrue(!operator2.hasNonEmptyPQ(42));
@@ -988,12 +988,12 @@ public class CEPOperatorTest extends TestLogger {
harness
.processElement(new StreamRecord<>(new SubEvent(42, "barfoo", 1.0, 5.0), 0L));
- assertTrue(!operator.hasNonEmptyNFA(42));
- assertTrue(!operator.hasNonEmptyNFA(43));
+ assertTrue(!operator.hasNonEmptyNFAState(42));
+ assertTrue(!operator.hasNonEmptyNFAState(43));
harness.setProcessingTime(3L);
- assertTrue(operator.hasNonEmptyNFA(42));
- assertTrue(operator.hasNonEmptyNFA(43));
+ assertTrue(operator.hasNonEmptyNFAState(42));
+ assertTrue(operator.hasNonEmptyNFAState(43));
harness.processElement(new StreamRecord<>(middleEvent2, 3L));
harness.processElement(new StreamRecord<>(middleEvent1, 3L));
@@ -1047,14 +1047,14 @@ public class CEPOperatorTest extends TestLogger {
assertTrue(operator.hasNonEmptyPQ(42));
assertTrue(operator.hasNonEmptyPQ(43));
- assertTrue(!operator.hasNonEmptyNFA(42));
- assertTrue(!operator.hasNonEmptyNFA(43));
+ assertTrue(!operator.hasNonEmptyNFAState(42));
+ assertTrue(!operator.hasNonEmptyNFAState(43));
harness.processWatermark(3L);
assertTrue(!operator.hasNonEmptyPQ(42));
assertTrue(!operator.hasNonEmptyPQ(43));
- assertTrue(operator.hasNonEmptyNFA(42));
- assertTrue(operator.hasNonEmptyNFA(43));
+ assertTrue(operator.hasNonEmptyNFAState(42));
+ assertTrue(operator.hasNonEmptyNFAState(43));
harness.processElement(new StreamRecord<>(startEvent2, 4L));
harness.processElement(new StreamRecord<Event>(middleEvent2, 5L));
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.4-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.4-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.4-snapshot
deleted file mode 100644
index 34c5110..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.4-snapshot and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.5-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.5-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.5-snapshot
new file mode 100644
index 0000000..73934e5
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.5-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.4-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.4-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.4-snapshot
deleted file mode 100644
index d4d3405..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.4-snapshot and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.5-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.5-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.5-snapshot
new file mode 100644
index 0000000..3126ff4
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.5-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.4-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.4-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.4-snapshot
deleted file mode 100644
index c87f307..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.4-snapshot and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.5-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.5-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.5-snapshot
new file mode 100644
index 0000000..63b1b49
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.5-snapshot differ
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.4-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.4-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.4-snapshot
deleted file mode 100644
index 660a21a..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.4-snapshot and /dev/null differ
http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.5-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.5-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.5-snapshot
new file mode 100644
index 0000000..1b1fa75
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.5-snapshot differ