You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/06/13 14:54:21 UTC
[04/10] flink git commit: [FLINK-9418] Migrate SharedBuffer to use
MapState
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
index c94d739..88504e9 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
@@ -19,7 +19,6 @@
package org.apache.flink.cep.nfa;
import org.apache.flink.cep.Event;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
@@ -36,7 +35,8 @@ import java.util.List;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
-import static org.junit.Assert.assertTrue;
+import static org.apache.flink.cep.utils.NFAUtils.compile;
+import static org.junit.Assert.assertEquals;
/**
* Tests for handling Events that are equal in case of {@link Object#equals(Object)} and have same timestamps.
@@ -45,7 +45,7 @@ import static org.junit.Assert.assertTrue;
public class SameElementITCase extends TestLogger {
@Test
- public void testEagerZeroOrMoreSameElement() {
+ public void testEagerZeroOrMoreSameElement() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -87,7 +87,7 @@ public class SameElementITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -138,15 +138,16 @@ public void testClearingBuffer() throws Exception {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
- NFAState<Event> nfaState = nfa.createNFAState();
+ NFAState nfaState = nfa.createInitialNFAState();
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(a1, b1, c1, d)
));
- assertTrue(nfaState.isEmpty());
+ assertEquals(1, nfaState.getComputationStates().size());
+ assertEquals("a", nfaState.getComputationStates().peek().getCurrentStateName());
}
@Test
@@ -182,9 +183,9 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
- NFAState<Event> nfaState = nfa.createNFAState();
+ NFAState nfaState = nfa.createInitialNFAState();
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
@@ -192,11 +193,12 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
Lists.newArrayList(a1, d1, d2),
Lists.newArrayList(a1, d1)
));
- assertTrue(nfaState.isEmpty());
+ assertEquals(1, nfaState.getComputationStates().size());
+ assertEquals("a", nfaState.getComputationStates().peek().getCurrentStateName());
}
@Test
- public void testZeroOrMoreSameElement() {
+ public void testZeroOrMoreSameElement() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -239,7 +241,7 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -320,7 +322,7 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -375,7 +377,7 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -418,7 +420,7 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
}
}).oneOrMore().optional();
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -481,7 +483,7 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
deleted file mode 100644
index 566e2b9..0000000
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.nfa;
-
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.cep.Event;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link SharedBuffer}.
- */
-public class SharedBufferTest extends TestLogger {
-
- @Test
- public void testSharedBuffer() {
- SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>();
- int numberEvents = 8;
- Event[] events = new Event[numberEvents];
- final long timestamp = 1L;
-
- for (int i = 0; i < numberEvents; i++) {
- events[i] = new Event(i + 1, "e" + (i + 1), i);
- }
-
- Map<String, List<Event>> expectedPattern1 = new HashMap<>();
- expectedPattern1.put("a1", new ArrayList<Event>());
- expectedPattern1.get("a1").add(events[2]);
-
- expectedPattern1.put("a[]", new ArrayList<Event>());
- expectedPattern1.get("a[]").add(events[3]);
-
- expectedPattern1.put("b", new ArrayList<Event>());
- expectedPattern1.get("b").add(events[5]);
-
- Map<String, List<Event>> expectedPattern2 = new HashMap<>();
- expectedPattern2.put("a1", new ArrayList<Event>());
- expectedPattern2.get("a1").add(events[0]);
-
- expectedPattern2.put("a[]", new ArrayList<Event>());
- expectedPattern2.get("a[]").add(events[1]);
- expectedPattern2.get("a[]").add(events[2]);
- expectedPattern2.get("a[]").add(events[3]);
- expectedPattern2.get("a[]").add(events[4]);
-
- expectedPattern2.put("b", new ArrayList<Event>());
- expectedPattern2.get("b").add(events[5]);
-
- Map<String, List<Event>> expectedPattern3 = new HashMap<>();
- expectedPattern3.put("a1", new ArrayList<Event>());
- expectedPattern3.get("a1").add(events[0]);
-
- expectedPattern3.put("a[]", new ArrayList<Event>());
- expectedPattern3.get("a[]").add(events[1]);
- expectedPattern3.get("a[]").add(events[2]);
- expectedPattern3.get("a[]").add(events[3]);
- expectedPattern3.get("a[]").add(events[4]);
- expectedPattern3.get("a[]").add(events[5]);
- expectedPattern3.get("a[]").add(events[6]);
-
- expectedPattern3.put("b", new ArrayList<Event>());
- expectedPattern3.get("b").add(events[7]);
-
- sharedBuffer.put("a1", events[0], timestamp, null, null, 0, 0, DeweyNumber.fromString("1"));
- sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], timestamp, 0, DeweyNumber.fromString("1.0"));
- sharedBuffer.put("a1", events[2], timestamp, null, null, 0, 0, DeweyNumber.fromString("2"));
- sharedBuffer.put("a[]", events[2], timestamp, "a[]", events[1], timestamp, 1, DeweyNumber.fromString("1.0"));
- sharedBuffer.put("a[]", events[3], timestamp, "a[]", events[2], timestamp, 2, DeweyNumber.fromString("1.0"));
- sharedBuffer.put("a[]", events[3], timestamp, "a1", events[2], timestamp, 0, DeweyNumber.fromString("2.0"));
- sharedBuffer.put("a[]", events[4], timestamp, "a[]", events[3], timestamp, 3, DeweyNumber.fromString("1.0"));
- sharedBuffer.put("b", events[5], timestamp, "a[]", events[4], timestamp, 4, DeweyNumber.fromString("1.0.0"));
- sharedBuffer.put("a[]", events[5], timestamp, "a[]", events[4], timestamp, 4, DeweyNumber.fromString("1.1"));
- sharedBuffer.put("b", events[5], timestamp, "a[]", events[3], timestamp, 1, DeweyNumber.fromString("2.0.0"));
- sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, 5, DeweyNumber.fromString("1.1"));
- sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, 6, DeweyNumber.fromString("1.1.0"));
-
- Collection<Map<String, List<Event>>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0"));
- sharedBuffer.release("b", events[7], timestamp, 7);
- Collection<Map<String, List<Event>>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0"));
-
- Collection<Map<String, List<Event>>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, 2, DeweyNumber.fromString("2.0.0"));
- Collection<Map<String, List<Event>>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, 5, DeweyNumber.fromString("1.0.0"));
- sharedBuffer.release("b", events[5], timestamp, 2);
- sharedBuffer.release("b", events[5], timestamp, 5);
-
- assertEquals(1L, patterns3.size());
- assertEquals(0L, patterns4.size());
- assertEquals(1L, patterns1.size());
- assertEquals(1L, patterns2.size());
-
- assertTrue(sharedBuffer.isEmpty());
- assertTrue(patterns4.isEmpty());
- assertEquals(Collections.singletonList(expectedPattern1), patterns1);
- assertEquals(Collections.singletonList(expectedPattern2), patterns2);
- assertEquals(Collections.singletonList(expectedPattern3), patterns3);
- }
-
- @Test
- public void testSharedBufferSerialization() throws IOException, ClassNotFoundException {
- SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>();
- int numberEvents = 8;
- Event[] events = new Event[numberEvents];
- final long timestamp = 1L;
-
- for (int i = 0; i < numberEvents; i++) {
- events[i] = new Event(i + 1, "e" + (i + 1), i);
- }
-
- sharedBuffer.put("a1", events[0], timestamp, null, null, 0, 0, DeweyNumber.fromString("1"));
- sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], timestamp, 0, DeweyNumber.fromString("1.0"));
- sharedBuffer.put("a1", events[2], timestamp, null, null, 0, 0, DeweyNumber.fromString("2"));
- sharedBuffer.put("a[]", events[2], timestamp, "a[]", events[1], timestamp, 1, DeweyNumber.fromString("1.0"));
- sharedBuffer.put("a[]", events[3], timestamp, "a[]", events[2], timestamp, 2, DeweyNumber.fromString("1.0"));
- sharedBuffer.put("a[]", events[3], timestamp, "a1", events[2], timestamp, 0, DeweyNumber.fromString("2.0"));
- sharedBuffer.put("a[]", events[4], timestamp, "a[]", events[3], timestamp, 3, DeweyNumber.fromString("1.0"));
- sharedBuffer.put("b", events[5], timestamp, "a[]", events[4], timestamp, 4, DeweyNumber.fromString("1.0.0"));
- sharedBuffer.put("a[]", events[5], timestamp, "a[]", events[4], timestamp, 4, DeweyNumber.fromString("1.1"));
- sharedBuffer.put("b", events[5], timestamp, "a[]", events[3], timestamp, 1, DeweyNumber.fromString("2.0.0"));
- sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, 5, DeweyNumber.fromString("1.1"));
- sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, 6, DeweyNumber.fromString("1.1.0"));
-
- SharedBuffer.SharedBufferSerializer serializer = new SharedBuffer.SharedBufferSerializer(
- StringSerializer.INSTANCE, Event.createTypeSerializer());
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- serializer.serialize(sharedBuffer, new DataOutputViewStreamWrapper(baos));
-
- ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
- SharedBuffer<String, Event> copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
-
- assertEquals(sharedBuffer, copy);
- }
-
- @Test
- public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
- SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>();
- int numberEvents = 8;
- Event[] events = new Event[numberEvents];
- final long timestamp = 1L;
-
- for (int i = 0; i < numberEvents; i++) {
- events[i] = new Event(i + 1, "e" + (i + 1), i);
- }
-
- sharedBuffer.put("start", events[1], timestamp, DeweyNumber.fromString("1"));
- sharedBuffer.put("branching", events[2], timestamp, "start", events[1], timestamp, 0, DeweyNumber.fromString("1.0"));
- sharedBuffer.put("branching", events[3], timestamp, "start", events[1], timestamp, 0, DeweyNumber.fromString("1.1"));
- sharedBuffer.put("branching", events[3], timestamp, "branching", events[2], timestamp, 1, DeweyNumber.fromString("1.0.0"));
- sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, 2, DeweyNumber.fromString("1.0.0.0"));
- sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, 2, DeweyNumber.fromString("1.1.0"));
-
- //simulate IGNORE (next event can point to events[2])
- sharedBuffer.lock("branching", events[2], timestamp, 1);
-
- sharedBuffer.release("branching", events[4], timestamp, 3);
-
- //There should be still events[1] and events[2] in the buffer
- assertFalse(sharedBuffer.isEmpty());
- }
-
- @Test
- public void testSharedBufferExtractOrder() {
- SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>();
- int numberEvents = 10;
- Event[] events = new Event[numberEvents];
- final long timestamp = 1L;
-
- for (int i = 0; i < numberEvents; i++) {
- events[i] = new Event(i + 1, "e" + (i + 1), i);
- }
-
- Map<String, List<Event>> expectedResult = new LinkedHashMap<>();
- expectedResult.put("a", new ArrayList<>());
- expectedResult.get("a").add(events[1]);
- expectedResult.put("b", new ArrayList<>());
- expectedResult.get("b").add(events[2]);
- expectedResult.put("aa", new ArrayList<>());
- expectedResult.get("aa").add(events[3]);
- expectedResult.put("bb", new ArrayList<>());
- expectedResult.get("bb").add(events[4]);
- expectedResult.put("c", new ArrayList<>());
- expectedResult.get("c").add(events[5]);
-
- sharedBuffer.put("a", events[1], timestamp, DeweyNumber.fromString("1"));
- sharedBuffer.put("b", events[2], timestamp, "a", events[1], timestamp, 0, DeweyNumber.fromString("1.0"));
- sharedBuffer.put("aa", events[3], timestamp, "b", events[2], timestamp, 1, DeweyNumber.fromString("1.0.0"));
- sharedBuffer.put("bb", events[4], timestamp, "aa", events[3], timestamp, 2, DeweyNumber.fromString("1.0.0.0"));
- sharedBuffer.put("c", events[5], timestamp, "bb", events[4], timestamp, 3, DeweyNumber.fromString("1.0.0.0.0"));
-
- Collection<Map<String, List<Event>>> patternsResult = sharedBuffer.extractPatterns("c", events[5], timestamp, 4, DeweyNumber.fromString("1.0.0.0.0"));
-
- List<String> expectedOrder = new ArrayList<>();
- expectedOrder.add("a");
- expectedOrder.add("b");
- expectedOrder.add("aa");
- expectedOrder.add("bb");
- expectedOrder.add("c");
-
- List<String> resultOrder = new ArrayList<>();
- for (String key: patternsResult.iterator().next().keySet()){
- resultOrder.add(key);
- }
- assertEquals(expectedOrder, resultOrder);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
index 4e540dd..049c84b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
@@ -19,7 +19,6 @@
package org.apache.flink.cep.nfa;
import org.apache.flink.cep.Event;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -34,13 +33,14 @@ import java.util.List;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFAUtils.compile;
/**
* Tests for {@link Pattern#timesOrMore(int)}.
*/
public class TimesOrMoreITCase extends TestLogger {
@Test
- public void testTimesOrMore() {
+ public void testTimesOrMore() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -79,7 +79,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -91,7 +91,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
@Test
- public void testTimesOrMoreNonStrict() {
+ public void testTimesOrMoreNonStrict() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -126,7 +126,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -139,7 +139,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
@Test
- public void testTimesOrMoreStrict() {
+ public void testTimesOrMoreStrict() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -174,7 +174,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -184,7 +184,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
@Test
- public void testTimesOrMoreStrictOptional() {
+ public void testTimesOrMoreStrictOptional() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -219,7 +219,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -230,7 +230,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
@Test
- public void testTimesOrMoreStrictOptional2() {
+ public void testTimesOrMoreStrictOptional2() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -263,7 +263,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -275,7 +275,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
@Test
- public void testTimesOrMoreNonStrictOptional() {
+ public void testTimesOrMoreNonStrictOptional() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -306,7 +306,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -316,7 +316,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
@Test
- public void testTimesOrMoreNonStrictOptional2() {
+ public void testTimesOrMoreNonStrictOptional2() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -351,7 +351,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -365,7 +365,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
@Test
- public void testTimesOrMoreNonStrictOptional3() {
+ public void testTimesOrMoreNonStrictOptional3() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -400,7 +400,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -413,7 +413,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
@Test
- public void testTimesOrMoreNonStrictWithNext() {
+ public void testTimesOrMoreNonStrictWithNext() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -448,7 +448,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -460,7 +460,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
@Test
- public void testTimesOrMoreNotStrictWithFollowedBy() {
+ public void testTimesOrMoreNotStrictWithFollowedBy() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -493,7 +493,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -504,7 +504,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
@Test
- public void testTimesOrMoreNotStrictWithFollowedByAny() {
+ public void testTimesOrMoreNotStrictWithFollowedByAny() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -537,7 +537,7 @@ public class TimesOrMoreITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
index 76ed26a..203a1c2 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
@@ -19,7 +19,6 @@
package org.apache.flink.cep.nfa;
import org.apache.flink.cep.Event;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -34,6 +33,7 @@ import java.util.List;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFAUtils.compile;
/**
* Tests for {@link Pattern#times(int, int)}.
@@ -42,7 +42,7 @@ import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
public class TimesRangeITCase extends TestLogger {
@Test
- public void testTimesRange() {
+ public void testTimesRange() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -80,7 +80,7 @@ public class TimesRangeITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -93,7 +93,7 @@ public class TimesRangeITCase extends TestLogger {
}
@Test
- public void testTimesRangeFromZero() {
+ public void testTimesRangeFromZero() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
Event startEvent = new Event(40, "c", 1.0);
@@ -131,7 +131,7 @@ public class TimesRangeITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -144,7 +144,7 @@ public class TimesRangeITCase extends TestLogger {
}
@Test
- public void testTimesRangeNonStrict() {
+ public void testTimesRangeNonStrict() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -178,7 +178,7 @@ public class TimesRangeITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -194,7 +194,7 @@ public class TimesRangeITCase extends TestLogger {
}
@Test
- public void testTimesRangeStrict() {
+ public void testTimesRangeStrict() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -228,7 +228,7 @@ public class TimesRangeITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -242,7 +242,7 @@ public class TimesRangeITCase extends TestLogger {
}
@Test
- public void testTimesRangeStrictOptional() {
+ public void testTimesRangeStrictOptional() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -276,7 +276,7 @@ public class TimesRangeITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -290,7 +290,7 @@ public class TimesRangeITCase extends TestLogger {
}
@Test
- public void testTimesRangeStrictOptional1() {
+ public void testTimesRangeStrictOptional1() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -322,7 +322,7 @@ public class TimesRangeITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -335,7 +335,7 @@ public class TimesRangeITCase extends TestLogger {
}
@Test
- public void testTimesRangeNonStrictOptional1() {
+ public void testTimesRangeNonStrictOptional1() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -365,7 +365,7 @@ public class TimesRangeITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -375,7 +375,7 @@ public class TimesRangeITCase extends TestLogger {
}
@Test
- public void testTimesRangeNonStrictOptional2() {
+ public void testTimesRangeNonStrictOptional2() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -409,7 +409,7 @@ public class TimesRangeITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -423,7 +423,7 @@ public class TimesRangeITCase extends TestLogger {
}
@Test
- public void testTimesRangeNonStrictOptional3() {
+ public void testTimesRangeNonStrictOptional3() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -457,7 +457,7 @@ public class TimesRangeITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -470,7 +470,7 @@ public class TimesRangeITCase extends TestLogger {
}
@Test
- public void testTimesRangeNonStrictWithNext() {
+ public void testTimesRangeNonStrictWithNext() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -504,7 +504,7 @@ public class TimesRangeITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -516,7 +516,7 @@ public class TimesRangeITCase extends TestLogger {
}
@Test
- public void testTimesRangeNotStrictWithFollowedBy() {
+ public void testTimesRangeNotStrictWithFollowedBy() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -548,7 +548,7 @@ public class TimesRangeITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -559,7 +559,7 @@ public class TimesRangeITCase extends TestLogger {
}
@Test
- public void testTimesRangeNotStrictWithFollowedByAny() {
+ public void testTimesRangeNotStrictWithFollowedByAny() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -591,7 +591,7 @@ public class TimesRangeITCase extends TestLogger {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
index b603174..fb7f086 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
@@ -19,7 +19,6 @@
package org.apache.flink.cep.nfa;
import org.apache.flink.cep.Event;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
@@ -34,7 +33,8 @@ import java.util.List;
import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
-import static org.junit.Assert.assertTrue;
+import static org.apache.flink.cep.utils.NFAUtils.compile;
+import static org.junit.Assert.assertEquals;
/**
* Tests for {@link Pattern#until(IterativeCondition)}.
@@ -89,9 +89,9 @@ public class UntilConditionITCase {
UNTIL_CONDITION
);
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
- NFAState<Event> nfaState = nfa.createNFAState();
+ NFAState nfaState = nfa.createInitialNFAState();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
@@ -99,7 +99,9 @@ public class UntilConditionITCase {
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
Lists.newArrayList(startEvent, middleEvent1, breaking)
));
- assertTrue(nfaState.isEmpty());
+
+ assertEquals(1, nfaState.getComputationStates().size());
+ assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
}
@Test
@@ -137,9 +139,9 @@ public class UntilConditionITCase {
}).oneOrMore().allowCombinations().until(UNTIL_CONDITION)
.followedBy("end").where(UNTIL_CONDITION);
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
- NFAState<Event> nfaState = nfa.createNFAState();
+ NFAState nfaState = nfa.createInitialNFAState();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
@@ -149,7 +151,8 @@ public class UntilConditionITCase {
Lists.newArrayList(startEvent, middleEvent1, middleEvent3, breaking),
Lists.newArrayList(startEvent, middleEvent1, breaking)
));
- assertTrue(nfaState.isEmpty());
+ assertEquals(1, nfaState.getComputationStates().size());
+ assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
}
@Test
@@ -187,9 +190,9 @@ public class UntilConditionITCase {
UNTIL_CONDITION
);
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
- NFAState<Event> nfaState = nfa.createNFAState();
+ NFAState nfaState = nfa.createInitialNFAState();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
@@ -197,7 +200,8 @@ public class UntilConditionITCase {
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
Lists.newArrayList(startEvent, middleEvent1, breaking)
));
- assertTrue(nfaState.isEmpty());
+ assertEquals(1, nfaState.getComputationStates().size());
+ assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
}
@Test
@@ -237,16 +241,17 @@ public class UntilConditionITCase {
UNTIL_CONDITION
);
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
- NFAState<Event> nfaState = nfa.createNFAState();
+ NFAState nfaState = nfa.createInitialNFAState();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, breaking)
));
- assertTrue(nfaState.isEmpty());
+ assertEquals(1, nfaState.getComputationStates().size());
+ assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
}
@Test
@@ -284,9 +289,9 @@ public class UntilConditionITCase {
UNTIL_CONDITION
);
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
- NFAState<Event> nfaState = nfa.createNFAState();
+ NFAState nfaState = nfa.createInitialNFAState();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
@@ -295,7 +300,8 @@ public class UntilConditionITCase {
Lists.newArrayList(startEvent, middleEvent1, breaking),
Lists.newArrayList(startEvent, breaking)
));
- assertTrue(nfaState.isEmpty());
+ assertEquals(1, nfaState.getComputationStates().size());
+ assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
}
@Test
@@ -333,9 +339,9 @@ public class UntilConditionITCase {
}).oneOrMore().optional().allowCombinations().until(UNTIL_CONDITION)
.followedBy("end").where(UNTIL_CONDITION);
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
- NFAState<Event> nfaState = nfa.createNFAState();
+ NFAState nfaState = nfa.createInitialNFAState();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
@@ -346,7 +352,8 @@ public class UntilConditionITCase {
Lists.newArrayList(startEvent, middleEvent1, breaking),
Lists.newArrayList(startEvent, breaking)
));
- assertTrue(nfaState.isEmpty());
+ assertEquals(1, nfaState.getComputationStates().size());
+ assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
}
@Test
@@ -384,9 +391,9 @@ public class UntilConditionITCase {
UNTIL_CONDITION
);
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
- NFAState<Event> nfaState = nfa.createNFAState();
+ NFAState nfaState = nfa.createInitialNFAState();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
@@ -395,7 +402,8 @@ public class UntilConditionITCase {
Lists.newArrayList(startEvent, middleEvent1, breaking),
Lists.newArrayList(startEvent, breaking)
));
- assertTrue(nfaState.isEmpty());
+ assertEquals(1, nfaState.getComputationStates().size());
+ assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
}
@Test
@@ -430,7 +438,7 @@ public class UntilConditionITCase {
}
}).oneOrMore().until(UNTIL_CONDITION);
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -474,7 +482,7 @@ public class UntilConditionITCase {
}
}).oneOrMore().optional().until(UNTIL_CONDITION);
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
@@ -514,9 +522,9 @@ public class UntilConditionITCase {
}
}).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
- NFAState<Event> nfaState = nfa.createNFAState();
+ NFAState nfaState = nfa.createInitialNFAState();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
@@ -526,7 +534,8 @@ public class UntilConditionITCase {
Lists.newArrayList(startEvent, middleEvent1)
));
- assertTrue(nfaState.isEmpty());
+ assertEquals(1, nfaState.getComputationStates().size());
+ assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
}
@Test
@@ -567,9 +576,9 @@ public class UntilConditionITCase {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
- NFAState<Event> nfaState = nfa.createNFAState();
+ NFAState nfaState = nfa.createInitialNFAState();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
@@ -579,7 +588,8 @@ public class UntilConditionITCase {
Lists.newArrayList(startEvent, middleEvent1)
));
- assertTrue(nfaState.isEmpty());
+ assertEquals(1, nfaState.getComputationStates().size());
+ assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
}
@Test
@@ -620,9 +630,9 @@ public class UntilConditionITCase {
}
});
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ NFA<Event> nfa = compile(pattern, false);
- NFAState<Event> nfaState = nfa.createNFAState();
+ NFAState nfaState = nfa.createInitialNFAState();
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
@@ -633,6 +643,7 @@ public class UntilConditionITCase {
Lists.newArrayList(startEvent)
));
- assertTrue(nfaState.isEmpty());
+ assertEquals(1, nfaState.getComputationStates().size());
+ assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index d1b6d59..92f59a5 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -18,10 +18,7 @@
package org.apache.flink.cep.nfa.compiler;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
@@ -46,6 +43,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -72,9 +70,6 @@ public class NFACompilerTest extends TestLogger {
}
};
- private static final TypeSerializer<Event> serializer = TypeExtractor.createTypeInfo(Event.class)
- .createSerializer(new ExecutionConfig());
-
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -90,7 +85,7 @@ public class NFACompilerTest extends TestLogger {
.followedBy("start").where(new TestFilter());
// here we must have an exception because of the two "start" patterns with the same name.
- NFACompiler.compile(invalidPattern, Event.createTypeSerializer(), false);
+ compile(invalidPattern, false);
}
@Test
@@ -105,7 +100,7 @@ public class NFACompilerTest extends TestLogger {
.notFollowedBy("end").where(new TestFilter());
// here we must have an exception because of the two "start" patterns with the same name.
- NFACompiler.compile(invalidPattern, Event.createTypeSerializer(), false);
+ compile(invalidPattern, false);
}
/**
@@ -131,7 +126,7 @@ public class NFACompilerTest extends TestLogger {
.followedBy("middle").subtype(SubEvent.class)
.next("end").where(endFilter);
- NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false);
+ NFA<Event> nfa = compile(pattern, false);
Collection<State<Event>> states = nfa.getStates();
assertEquals(4, states.size());
@@ -218,7 +213,7 @@ public class NFACompilerTest extends TestLogger {
}
});
- NFACompiler.compile(invalidPattern, Event.createTypeSerializer(), false);
+ compile(invalidPattern, false);
}
private <T> Set<Tuple2<String, StateTransitionAction>> unfoldTransitions(final State<T> state) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
new file mode 100644
index 0000000..9be7cc1
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link SharedBuffer}.
+ */
+public class SharedBufferTest extends TestLogger {
+
+ @Test
+ public void testSharedBuffer() throws Exception {
+ SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+ int numberEvents = 8;
+ Event[] events = new Event[numberEvents];
+ EventId[] eventIds = new EventId[numberEvents];
+ final long timestamp = 1L;
+
+ for (int i = 0; i < numberEvents; i++) {
+ events[i] = new Event(i + 1, "e" + (i + 1), i);
+ eventIds[i] = sharedBuffer.registerEvent(events[i], timestamp);
+ }
+
+ Map<String, List<Event>> expectedPattern1 = new HashMap<>();
+ expectedPattern1.put("a1", new ArrayList<>());
+ expectedPattern1.get("a1").add(events[2]);
+
+ expectedPattern1.put("a[]", new ArrayList<>());
+ expectedPattern1.get("a[]").add(events[3]);
+
+ expectedPattern1.put("b", new ArrayList<>());
+ expectedPattern1.get("b").add(events[5]);
+
+ Map<String, List<Event>> expectedPattern2 = new HashMap<>();
+ expectedPattern2.put("a1", new ArrayList<>());
+ expectedPattern2.get("a1").add(events[0]);
+
+ expectedPattern2.put("a[]", new ArrayList<>());
+ expectedPattern2.get("a[]").add(events[1]);
+ expectedPattern2.get("a[]").add(events[2]);
+ expectedPattern2.get("a[]").add(events[3]);
+ expectedPattern2.get("a[]").add(events[4]);
+
+ expectedPattern2.put("b", new ArrayList<>());
+ expectedPattern2.get("b").add(events[5]);
+
+ Map<String, List<Event>> expectedPattern3 = new HashMap<>();
+ expectedPattern3.put("a1", new ArrayList<>());
+ expectedPattern3.get("a1").add(events[0]);
+
+ expectedPattern3.put("a[]", new ArrayList<>());
+ expectedPattern3.get("a[]").add(events[1]);
+ expectedPattern3.get("a[]").add(events[2]);
+ expectedPattern3.get("a[]").add(events[3]);
+ expectedPattern3.get("a[]").add(events[4]);
+ expectedPattern3.get("a[]").add(events[5]);
+ expectedPattern3.get("a[]").add(events[6]);
+
+ expectedPattern3.put("b", new ArrayList<>());
+ expectedPattern3.get("b").add(events[7]);
+
+ NodeId a10 = sharedBuffer.put("a1", eventIds[0], DeweyNumber.fromString("1"));
+ NodeId aLoop0 = sharedBuffer.put("a[]", eventIds[1], a10, DeweyNumber.fromString("1.0"));
+ NodeId a11 = sharedBuffer.put("a1", eventIds[2], DeweyNumber.fromString("2"));
+ NodeId aLoop1 = sharedBuffer.put("a[]", eventIds[2], aLoop0, DeweyNumber.fromString("1.0"));
+ NodeId aLoop2 = sharedBuffer.put("a[]", eventIds[3], aLoop1, DeweyNumber.fromString("1.0"));
+ NodeId aSecondLoop0 = sharedBuffer.put("a[]", eventIds[3], a11, DeweyNumber.fromString("2.0"));
+ NodeId aLoop3 = sharedBuffer.put("a[]", eventIds[4], aLoop2, DeweyNumber.fromString("1.0"));
+ NodeId b0 = sharedBuffer.put("b", eventIds[5], aLoop3, DeweyNumber.fromString("1.0.0"));
+ NodeId aLoop4 = sharedBuffer.put("a[]", eventIds[5], aLoop3, DeweyNumber.fromString("1.1"));
+ NodeId b1 = sharedBuffer.put("b", eventIds[5], aSecondLoop0, DeweyNumber.fromString("2.0.0"));
+ NodeId aLoop5 = sharedBuffer.put("a[]", eventIds[6], aLoop4, DeweyNumber.fromString("1.1"));
+ NodeId b3 = sharedBuffer.put("b", eventIds[7], aLoop5, DeweyNumber.fromString("1.1.0"));
+
+ Collection<Map<String, List<Event>>> patterns3 = sharedBuffer.extractPatterns(b3, DeweyNumber.fromString("1.1.0"));
+ sharedBuffer.releaseNode(b3);
+ Collection<Map<String, List<Event>>> patterns4 = sharedBuffer.extractPatterns(b3, DeweyNumber.fromString("1.1.0"));
+
+ Collection<Map<String, List<Event>>> patterns1 = sharedBuffer.extractPatterns(b1, DeweyNumber.fromString("2.0.0"));
+ Collection<Map<String, List<Event>>> patterns2 = sharedBuffer.extractPatterns(b0, DeweyNumber.fromString("1.0.0"));
+ sharedBuffer.releaseNode(b0);
+ sharedBuffer.releaseNode(b1);
+
+ for (EventId eventId : eventIds) {
+ sharedBuffer.releaseEvent(eventId);
+ }
+
+ assertEquals(1L, patterns3.size());
+ assertEquals(0L, patterns4.size());
+ assertEquals(1L, patterns1.size());
+ assertEquals(1L, patterns2.size());
+
+ assertTrue(sharedBuffer.isEmpty());
+ assertTrue(patterns4.isEmpty());
+ assertEquals(Collections.singletonList(expectedPattern1), patterns1);
+ assertEquals(Collections.singletonList(expectedPattern2), patterns2);
+ assertEquals(Collections.singletonList(expectedPattern3), patterns3);
+ }
+
+ @Test
+ public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() throws Exception {
+ SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+ int numberEvents = 8;
+ Event[] events = new Event[numberEvents];
+ EventId[] eventIds = new EventId[numberEvents];
+ final long timestamp = 1L;
+
+ for (int i = 0; i < numberEvents; i++) {
+ events[i] = new Event(i + 1, "e" + (i + 1), i);
+ eventIds[i] = sharedBuffer.registerEvent(events[i], timestamp);
+ }
+
+ NodeId start = sharedBuffer.put("start", eventIds[1], DeweyNumber.fromString("1"));
+ NodeId b0 = sharedBuffer.put("branching", eventIds[2], start, DeweyNumber.fromString("1.0"));
+ NodeId b1 = sharedBuffer.put("branching", eventIds[3], start, DeweyNumber.fromString("1.1"));
+ NodeId b00 = sharedBuffer.put("branching", eventIds[3], b0, DeweyNumber.fromString("1.0.0"));
+ sharedBuffer.put("branching", eventIds[4], b00, DeweyNumber.fromString("1.0.0.0"));
+ NodeId b10 = sharedBuffer.put("branching", eventIds[4], b1, DeweyNumber.fromString("1.1.0"));
+
+ //simulate IGNORE (next event can point to events[2])
+ sharedBuffer.lockNode(b0);
+
+ sharedBuffer.releaseNode(b10);
+
+ for (EventId eventId : eventIds) {
+ sharedBuffer.releaseEvent(eventId);
+ }
+
+ //There should be still events[1] and events[2] in the buffer
+ assertFalse(sharedBuffer.isEmpty());
+ }
+
+ @Test
+ public void testSharedBufferExtractOrder() throws Exception {
+ SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+ int numberEvents = 5;
+ Event[] events = new Event[numberEvents];
+ EventId[] eventIds = new EventId[numberEvents];
+ final long timestamp = 1L;
+
+ for (int i = 0; i < numberEvents; i++) {
+ events[i] = new Event(i + 1, "e" + (i + 1), i);
+ eventIds[i] = sharedBuffer.registerEvent(events[i], timestamp);
+ }
+
+ Map<String, List<Event>> expectedResult = new LinkedHashMap<>();
+ expectedResult.put("a", new ArrayList<>());
+ expectedResult.get("a").add(events[0]);
+ expectedResult.put("b", new ArrayList<>());
+ expectedResult.get("b").add(events[1]);
+ expectedResult.put("aa", new ArrayList<>());
+ expectedResult.get("aa").add(events[2]);
+ expectedResult.put("bb", new ArrayList<>());
+ expectedResult.get("bb").add(events[3]);
+ expectedResult.put("c", new ArrayList<>());
+ expectedResult.get("c").add(events[4]);
+
+ NodeId a = sharedBuffer.put("a", eventIds[0], DeweyNumber.fromString("1"));
+ NodeId b = sharedBuffer.put("b", eventIds[1], a, DeweyNumber.fromString("1.0"));
+ NodeId aa = sharedBuffer.put("aa", eventIds[2], b, DeweyNumber.fromString("1.0.0"));
+ NodeId bb = sharedBuffer.put("bb", eventIds[3], aa, DeweyNumber.fromString("1.0.0.0"));
+ NodeId c = sharedBuffer.put("c", eventIds[4], bb, DeweyNumber.fromString("1.0.0.0.0"));
+
+ Collection<Map<String, List<Event>>> patternsResult = sharedBuffer.extractPatterns(c,
+ DeweyNumber.fromString("1.0.0.0.0"));
+
+ List<String> expectedOrder = new ArrayList<>();
+ expectedOrder.add("a");
+ expectedOrder.add("b");
+ expectedOrder.add("aa");
+ expectedOrder.add("bb");
+ expectedOrder.add("c");
+
+ for (EventId eventId : eventIds) {
+ sharedBuffer.releaseEvent(eventId);
+ }
+
+ List<String> resultOrder = new ArrayList<>(patternsResult.iterator().next().keySet());
+ assertEquals(expectedOrder, resultOrder);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
index 9b120ca..6c7543a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
@@ -551,7 +551,7 @@ public class CEPMigrationTest {
}
@Test
- public void testAndOrSubtypConditionsAfterMigration() throws Exception {
+ public void testAndOrSubtypeConditionsAfterMigration() throws Exception {
KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
private static final long serialVersionUID = -4873366487571254798L;
@@ -625,7 +625,7 @@ public class CEPMigrationTest {
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
.within(Time.milliseconds(10L));
- return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+ return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
}
}
@@ -653,7 +653,7 @@ public class CEPMigrationTest {
.times(2)
.within(Time.milliseconds(10L));
- return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+ return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
}
}
@@ -684,7 +684,7 @@ public class CEPMigrationTest {
// priority queue in CEP operator are correctly checkpointed/restored
.within(Time.milliseconds(10L));
- return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+ return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 5a98445..4786484 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -64,6 +64,7 @@ import java.util.Map;
import java.util.Queue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.validateMockitoUsage;
@@ -462,9 +463,9 @@ public class CEPOperatorTest extends TestLogger {
try {
harness.open();
- final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "nfaValueState");
+ final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "computationStates");
final ValueState nfaOperatorStateSpy = Mockito.spy(nfaOperatorState);
- Whitebox.setInternalState(operator, "nfaValueState", nfaOperatorStateSpy);
+ Whitebox.setInternalState(operator, "computationStates", nfaOperatorStateSpy);
Event startEvent = new Event(42, "c", 1.0);
SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
@@ -476,7 +477,7 @@ public class CEPOperatorTest extends TestLogger {
harness.processElement(new StreamRecord<>(endEvent, 4L));
// verify the number of invocations NFA is updated
- Mockito.verify(nfaOperatorStateSpy, Mockito.times(2)).update(Mockito.any());
+ Mockito.verify(nfaOperatorStateSpy, Mockito.times(3)).update(Mockito.any());
// get and verify the output
Queue<Object> result = harness.getOutput();
@@ -507,9 +508,9 @@ public class CEPOperatorTest extends TestLogger {
harness.open();
- final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "nfaValueState");
+ final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "computationStates");
final ValueState nfaOperatorStateSpy = Mockito.spy(nfaOperatorState);
- Whitebox.setInternalState(operator, "nfaValueState", nfaOperatorStateSpy);
+ Whitebox.setInternalState(operator, "computationStates", nfaOperatorStateSpy);
Event startEvent = new Event(42, "c", 1.0);
SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
@@ -521,7 +522,7 @@ public class CEPOperatorTest extends TestLogger {
harness.processElement(new StreamRecord<>(endEvent, 4L));
// verify the number of invocations NFA is updated
- Mockito.verify(nfaOperatorStateSpy, Mockito.times(2)).update(Mockito.any());
+ Mockito.verify(nfaOperatorStateSpy, Mockito.times(3)).update(Mockito.any());
// get and verify the output
Queue<Object> result = harness.getOutput();
@@ -568,8 +569,8 @@ public class CEPOperatorTest extends TestLogger {
assertEquals(2L, harness.numEventTimeTimers());
assertEquals(4L, operator.getPQSize(42));
assertEquals(1L, operator.getPQSize(43));
- assertTrue(!operator.hasNonEmptyNFAState(42));
- assertTrue(!operator.hasNonEmptyNFAState(43));
+ assertTrue(!operator.hasNonEmptySharedBuffer(42));
+ assertTrue(!operator.hasNonEmptySharedBuffer(43));
harness.processWatermark(new Watermark(2L));
@@ -581,9 +582,9 @@ public class CEPOperatorTest extends TestLogger {
// for 43 the element entered the NFA and the PQ is empty
assertEquals(2L, harness.numEventTimeTimers());
- assertTrue(operator.hasNonEmptyNFAState(42));
+ assertTrue(operator.hasNonEmptySharedBuffer(42));
assertEquals(1L, operator.getPQSize(42));
- assertTrue(operator.hasNonEmptyNFAState(43));
+ assertTrue(operator.hasNonEmptySharedBuffer(43));
assertTrue(!operator.hasNonEmptyPQ(43));
harness.processElement(new StreamRecord<>(startEvent2, 4L));
@@ -605,9 +606,9 @@ public class CEPOperatorTest extends TestLogger {
// now we have 1 key because the 43 expired and was removed.
// 42 is still there due to startEvent2
assertEquals(1L, harness.numEventTimeTimers());
- assertTrue(operator2.hasNonEmptyNFAState(42));
+ assertTrue(operator2.hasNonEmptySharedBuffer(42));
assertTrue(!operator2.hasNonEmptyPQ(42));
- assertTrue(!operator2.hasNonEmptyNFAState(43));
+ assertTrue(!operator2.hasNonEmptySharedBuffer(43));
assertTrue(!operator2.hasNonEmptyPQ(43));
verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
@@ -622,7 +623,7 @@ public class CEPOperatorTest extends TestLogger {
harness.processWatermark(20L);
harness.processWatermark(21L);
- assertTrue(!operator2.hasNonEmptyNFAState(42));
+ assertTrue(!operator2.hasNonEmptySharedBuffer(42));
assertTrue(!operator2.hasNonEmptyPQ(42));
assertEquals(0L, harness.numEventTimeTimers());
@@ -665,7 +666,7 @@ public class CEPOperatorTest extends TestLogger {
assertEquals(1L, harness.numEventTimeTimers());
assertEquals(7L, operator.getPQSize(41));
- assertTrue(!operator.hasNonEmptyNFAState(41));
+ assertTrue(!operator.hasNonEmptySharedBuffer(41));
harness.processWatermark(new Watermark(2L));
@@ -674,7 +675,7 @@ public class CEPOperatorTest extends TestLogger {
assertEquals(1L, harness.numEventTimeTimers());
assertEquals(6L, operator.getPQSize(41));
- assertTrue(operator.hasNonEmptyNFAState(41)); // processed the first element
+ assertTrue(operator.hasNonEmptySharedBuffer(41)); // processed the first element
harness.processWatermark(new Watermark(8L));
@@ -714,12 +715,12 @@ public class CEPOperatorTest extends TestLogger {
assertEquals(1L, harness.numEventTimeTimers());
assertEquals(0L, operator.getPQSize(41));
- assertTrue(operator.hasNonEmptyNFAState(41));
+ assertTrue(operator.hasNonEmptySharedBuffer(41));
harness.processWatermark(new Watermark(17L));
verifyWatermark(harness.getOutput().poll(), 17L);
- assertTrue(!operator.hasNonEmptyNFAState(41));
+ assertTrue(!operator.hasNonEmptySharedBuffer(41));
assertTrue(!operator.hasNonEmptyPQ(41));
assertEquals(0L, harness.numEventTimeTimers());
} finally {
@@ -800,8 +801,8 @@ public class CEPOperatorTest extends TestLogger {
assertTrue(!operator.hasNonEmptyPQ(42));
assertTrue(!operator.hasNonEmptyPQ(43));
- assertTrue(operator.hasNonEmptyNFAState(42));
- assertTrue(operator.hasNonEmptyNFAState(43));
+ assertTrue(operator.hasNonEmptySharedBuffer(42));
+ assertTrue(operator.hasNonEmptySharedBuffer(43));
harness.setProcessingTime(3L);
@@ -834,10 +835,10 @@ public class CEPOperatorTest extends TestLogger {
harness.setProcessingTime(21L);
- assertTrue(operator2.hasNonEmptyNFAState(42));
+ assertTrue(operator2.hasNonEmptySharedBuffer(42));
harness.processElement(new StreamRecord<>(startEvent1, 21L));
- assertTrue(operator2.hasNonEmptyNFAState(42));
+ assertTrue(operator2.hasNonEmptySharedBuffer(42));
harness.setProcessingTime(49L);
@@ -845,7 +846,7 @@ public class CEPOperatorTest extends TestLogger {
harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
// the pattern expired
- assertTrue(!operator2.hasNonEmptyNFAState(42));
+ assertTrue(!operator2.hasNonEmptySharedBuffer(42));
assertEquals(0L, harness.numEventTimeTimers());
assertTrue(!operator2.hasNonEmptyPQ(42));
@@ -910,7 +911,7 @@ public class CEPOperatorTest extends TestLogger {
@Override
public NFA<Event> createNFA() {
- return NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+ return NFACompiler.compileFactory(pattern, false).createNFA();
}
});
@@ -988,12 +989,12 @@ public class CEPOperatorTest extends TestLogger {
harness
.processElement(new StreamRecord<>(new SubEvent(42, "barfoo", 1.0, 5.0), 0L));
- assertTrue(!operator.hasNonEmptyNFAState(42));
- assertTrue(!operator.hasNonEmptyNFAState(43));
+ assertTrue(!operator.hasNonEmptySharedBuffer(42));
+ assertTrue(!operator.hasNonEmptySharedBuffer(43));
harness.setProcessingTime(3L);
- assertTrue(operator.hasNonEmptyNFAState(42));
- assertTrue(operator.hasNonEmptyNFAState(43));
+ assertTrue(operator.hasNonEmptySharedBuffer(42));
+ assertTrue(operator.hasNonEmptySharedBuffer(43));
harness.processElement(new StreamRecord<>(middleEvent2, 3L));
harness.processElement(new StreamRecord<>(middleEvent1, 3L));
@@ -1047,14 +1048,14 @@ public class CEPOperatorTest extends TestLogger {
assertTrue(operator.hasNonEmptyPQ(42));
assertTrue(operator.hasNonEmptyPQ(43));
- assertTrue(!operator.hasNonEmptyNFAState(42));
- assertTrue(!operator.hasNonEmptyNFAState(43));
+ assertFalse(operator.hasNonEmptySharedBuffer(42));
+ assertFalse(operator.hasNonEmptySharedBuffer(43));
harness.processWatermark(3L);
- assertTrue(!operator.hasNonEmptyPQ(42));
- assertTrue(!operator.hasNonEmptyPQ(43));
- assertTrue(operator.hasNonEmptyNFAState(42));
- assertTrue(operator.hasNonEmptyNFAState(43));
+ assertFalse(operator.hasNonEmptyPQ(42));
+ assertFalse(operator.hasNonEmptyPQ(43));
+ assertTrue(operator.hasNonEmptySharedBuffer(42));
+ assertTrue(operator.hasNonEmptySharedBuffer(43));
harness.processElement(new StreamRecord<>(startEvent2, 4L));
harness.processElement(new StreamRecord<Event>(middleEvent2, 5L));
@@ -1224,7 +1225,7 @@ public class CEPOperatorTest extends TestLogger {
// priority queue in CEP operator are correctly checkpointed/restored
.within(Time.milliseconds(10L));
- return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+ return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
}
}
@@ -1275,7 +1276,7 @@ public class CEPOperatorTest extends TestLogger {
}
}).within(Time.milliseconds(10L));
- return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+ return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
}
}
@@ -1319,7 +1320,7 @@ public class CEPOperatorTest extends TestLogger {
}
}).within(Time.milliseconds(10L));
- return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+ return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 8a73556..c4b147c 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -26,8 +26,10 @@ import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -366,20 +368,23 @@ public class CEPRescalingTest {
}
private KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> getTestHarness(
- int maxParallelism,
- int taskParallelism,
- int subtaskIdx) throws Exception {
+ int maxParallelism,
+ int taskParallelism,
+ int subtaskIdx) throws Exception {
KeySelector<Event, Integer> keySelector = new TestKeySelector();
- return new KeyedOneInputStreamOperatorTestHarness<>(
- getKeyedCepOpearator(
- false,
- new NFAFactory()),
- keySelector,
- BasicTypeInfo.INT_TYPE_INFO,
- maxParallelism,
- taskParallelism,
- subtaskIdx);
+ KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness =
+ new KeyedOneInputStreamOperatorTestHarness<>(
+ getKeyedCepOpearator(
+ false,
+ new NFAFactory()),
+ keySelector,
+ BasicTypeInfo.INT_TYPE_INFO,
+ maxParallelism,
+ taskParallelism,
+ subtaskIdx);
+ harness.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+ return harness;
}
private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
@@ -427,7 +432,7 @@ public class CEPRescalingTest {
// priority queue in CEP operator are correctly checkpointed/restored
.within(Time.milliseconds(10L));
- return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+ return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
index 3753f34..abc4b18 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
@@ -84,6 +84,8 @@ public class CepOperatorTestUtilities {
comparator,
null,
new PatternSelectFunction<Event, Map<String, List<Event>>>() {
+ private static final long serialVersionUID = -7143807777582726991L;
+
@Override
public Map<String, List<Event>> select(Map<String, List<Event>> pattern) throws Exception {
return pattern;
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFAUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFAUtils.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFAUtils.java
new file mode 100644
index 0000000..f2c6613
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFAUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.utils;
+
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+
+import static org.apache.flink.cep.nfa.compiler.NFACompiler.compileFactory;
+
+/**
+ * Utility methods for constructing NFA.
+ */
+public class NFAUtils {
+
+ /**
+ * Compiles the given pattern into a {@link NFA}.
+ *
+ * @param pattern Definition of sequence pattern
+ * @param timeoutHandling True if the NFA shall return timed out event patterns
+ * @param <T> Type of the input events
+ * @return Non-deterministic finite automaton representing the given pattern
+ */
+ public static <T> NFA<T> compile(Pattern<T, ?> pattern, boolean timeoutHandling) {
+ NFACompiler.NFAFactory<T> factory = compileFactory(pattern, timeoutHandling);
+ return factory.createNFA();
+ }
+
+ private NFAUtils() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
new file mode 100644
index 0000000..2c7b979
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.utils;
+
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Extends {@link SharedBuffer} with methods for checking the number of state accesses. It does not use a proper
+ * StateBackend, but uses stubs over java collections.
+ */
+public class TestSharedBuffer<V> extends SharedBuffer<V> {
+
+ private final MockKeyedStateStore keyedStateStore;
+
+ private TestSharedBuffer(MockKeyedStateStore stateStore, TypeSerializer<V> valueSerializer) {
+ super(stateStore, valueSerializer);
+ this.keyedStateStore = stateStore;
+ }
+
+ public long getStateWrites() {
+ return keyedStateStore.stateWrites;
+ }
+
+ public long getStateReads() {
+ return keyedStateStore.stateReads;
+ }
+
+ public long getStateAccesses() {
+ return getStateWrites() + getStateReads();
+ }
+
+ /**
+ * Creates instance of {@link TestSharedBuffer}.
+ *
+ * @param typeSerializer serializer used to serialize incoming events
+ * @param <T> type of incoming events
+ * @return TestSharedBuffer instance
+ */
+ public static <T> TestSharedBuffer<T> createTestBuffer(TypeSerializer<T> typeSerializer) {
+ return new TestSharedBuffer<>(new MockKeyedStateStore(), typeSerializer);
+ }
+
+ private static class MockKeyedStateStore implements KeyedStateStore {
+
+ private long stateWrites = 0;
+ private long stateReads = 0;
+
+ @Override
+ public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+ return new ValueState<T>() {
+
+ private T value;
+
+ @Override
+ public T value() throws IOException {
+ stateReads++;
+ return value;
+ }
+
+ @Override
+ public void update(T value) throws IOException {
+ stateWrites++;
+ this.value = value;
+ }
+
+ @Override
+ public void clear() {
+ this.value = null;
+ }
+ };
+ }
+
+ @Override
+ public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+ return new MapState<UK, UV>() {
+
+ private Map<UK, UV> values;
+
+ private Map<UK, UV> getOrSetMap() {
+ if (values == null) {
+ this.values = new HashMap<>();
+ }
+ return values;
+ }
+
+ @Override
+ public UV get(UK key) throws Exception {
+ stateReads++;
+ if (values == null) {
+ return null;
+ }
+
+ return values.get(key);
+ }
+
+ @Override
+ public void put(UK key, UV value) throws Exception {
+ stateWrites++;
+ getOrSetMap().put(key, value);
+ }
+
+ @Override
+ public void putAll(Map<UK, UV> map) throws Exception {
+ stateWrites++;
+ getOrSetMap().putAll(map);
+ }
+
+ @Override
+ public void remove(UK key) throws Exception {
+ if (values == null) {
+ return;
+ }
+
+ stateWrites++;
+ values.remove(key);
+ }
+
+ @Override
+ public boolean contains(UK key) throws Exception {
+ if (values == null) {
+ return false;
+ }
+
+ stateReads++;
+ return values.containsKey(key);
+ }
+
+ @Override
+ public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
+ if (values == null) {
+ return Collections.emptyList();
+ }
+
+ return () -> new CountingIterator<>(values.entrySet().iterator());
+ }
+
+ @Override
+ public Iterable<UK> keys() throws Exception {
+ if (values == null) {
+ return Collections.emptyList();
+ }
+
+ return () -> new CountingIterator<>(values.keySet().iterator());
+ }
+
+ @Override
+ public Iterable<UV> values() throws Exception {
+ if (values == null) {
+ return Collections.emptyList();
+ }
+
+ return () -> new CountingIterator<>(values.values().iterator());
+ }
+
+ @Override
+ public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
+ if (values == null) {
+ return Iterators.emptyIterator();
+ }
+
+ return new CountingIterator<>(values.entrySet().iterator());
+ }
+
+ @Override
+ public void clear() {
+ stateWrites++;
+ this.values = null;
+ }
+ };
+ }
+
+ private class CountingIterator<T> implements Iterator<T> {
+
+ private final Iterator<T> iterator;
+
+ CountingIterator(Iterator<T> iterator) {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public T next() {
+ stateReads++;
+ return iterator.next();
+ }
+ }
+ }
+
+}