You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 08:51:57 UTC
[1/3] flink git commit: [FLINK-6137] Activate strict checkstyle for
flink-cep
Repository: flink
Updated Branches:
refs/heads/master 4f50dc4df -> c9e574bf3
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
new file mode 100644
index 0000000..7bf0767
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.primitives.Doubles;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base method for IT tests of {@link NFA}. It provides utility methods.
+ */
+public class NFATestUtilities {
+
+ public static List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa) {
+ List<List<Event>> resultingPatterns = new ArrayList<>();
+
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ Collection<Map<String, List<Event>>> patterns = nfa.process(
+ inputEvent.getValue(),
+ inputEvent.getTimestamp()).f0;
+
+ for (Map<String, List<Event>> p: patterns) {
+ List<Event> res = new ArrayList<>();
+ for (List<Event> le: p.values()) {
+ res.addAll(le);
+ }
+ resultingPatterns.add(res);
+ }
+ }
+ return resultingPatterns;
+ }
+
+ public static void compareMaps(List<List<Event>> actual, List<List<Event>> expected) {
+ Assert.assertEquals(expected.size(), actual.size());
+
+ for (List<Event> p: actual) {
+ Collections.sort(p, new EventComparator());
+ }
+
+ for (List<Event> p: expected) {
+ Collections.sort(p, new EventComparator());
+ }
+
+ Collections.sort(actual, new ListEventComparator());
+ Collections.sort(expected, new ListEventComparator());
+ Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+ }
+
+ private static class ListEventComparator implements Comparator<List<Event>> {
+
+ @Override
+ public int compare(List<Event> o1, List<Event> o2) {
+ int sizeComp = Integer.compare(o1.size(), o2.size());
+ if (sizeComp == 0) {
+ EventComparator comp = new EventComparator();
+ for (int i = 0; i < o1.size(); i++) {
+ int eventComp = comp.compare(o1.get(i), o2.get(i));
+ if (eventComp != 0) {
+ return eventComp;
+ }
+ }
+ return 0;
+ } else {
+ return sizeComp;
+ }
+ }
+ }
+
+ private static class EventComparator implements Comparator<Event> {
+
+ @Override
+ public int compare(Event o1, Event o2) {
+ int nameComp = o1.getName().compareTo(o2.getName());
+ int priceComp = Doubles.compare(o1.getPrice(), o2.getPrice());
+ int idComp = Integer.compare(o1.getId(), o2.getId());
+ if (nameComp == 0) {
+ if (priceComp == 0) {
+ return idComp;
+ } else {
+ return priceComp;
+ }
+ } else {
+ return nameComp;
+ }
+ }
+ }
+
+ private NFATestUtilities() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
new file mode 100644
index 0000000..3b95eb4
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
@@ -0,0 +1,1036 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link Pattern#notFollowedBy(String)} and {@link Pattern#notNext(String)}.
+ */
+@SuppressWarnings("unchecked")
+public class NotPatternITCase extends TestLogger {
+
+ @Test
+ public void testNotNext() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event b1 = new Event(42, "b", 3.0);
+ Event c2 = new Event(43, "c", 4.0);
+ Event d = new Event(43, "d", 4.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(b1, 3));
+ inputEvents.add(new StreamRecord<>(c2, 4));
+ inputEvents.add(new StreamRecord<>(d, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5167288560432018992L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notNext("notPattern").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 2242479288129905510L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 1404509325548220892L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -8907427230007830915L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a1, c1, d),
+ Lists.newArrayList(a1, c2, d)
+ ));
+ }
+
+ @Test
+ public void testNotNextNoMatches() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event b1 = new Event(42, "b", 3.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event c2 = new Event(43, "c", 4.0);
+ Event d = new Event(43, "d", 4.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(b1, 2));
+ inputEvents.add(new StreamRecord<>(c1, 3));
+ inputEvents.add(new StreamRecord<>(c2, 4));
+ inputEvents.add(new StreamRecord<>(d, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -339500190577666439L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notNext("notPattern").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -6913980632538046451L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedBy("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 3332196998905139891L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 2086563479959018387L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ assertEquals(0, matches.size());
+ }
+
+ @Test
+ public void testNotNextNoMatchesAtTheEnd() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event c2 = new Event(43, "c", 4.0);
+ Event d = new Event(43, "d", 4.0);
+ Event b1 = new Event(42, "b", 3.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(c2, 3));
+ inputEvents.add(new StreamRecord<>(d, 4));
+ inputEvents.add(new StreamRecord<>(b1, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 1672995058886176627L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 6003621617520261554L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 887700237024758417L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ }).notNext("notPattern").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5239529076086933032L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ assertEquals(0, matches.size());
+ }
+
+ @Test
+ public void testNotFollowedBy() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event b1 = new Event(42, "b", 3.0);
+ Event c2 = new Event(43, "c", 4.0);
+ Event d = new Event(43, "d", 4.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(b1, 3));
+ inputEvents.add(new StreamRecord<>(c2, 4));
+ inputEvents.add(new StreamRecord<>(d, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -2641662468313191976L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -3632144132379494778L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 3818766882138348167L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 2033204730795451288L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a1, c1, d)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByBeforeOptional() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event b1 = new Event(42, "b", 3.0);
+ Event c2 = new Event(43, "c", 4.0);
+ Event d = new Event(43, "d", 4.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(b1, 3));
+ inputEvents.add(new StreamRecord<>(c2, 4));
+ inputEvents.add(new StreamRecord<>(d, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -2454396370205097543L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 2749547391611263290L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -4989511337298217255L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).optional().followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -8466223836652936608L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a1, c1, d)
+ ));
+ }
+
+ @Test
+ public void testTimesWithNotFollowedBy() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event b1 = new Event(41, "b", 2.0);
+ Event c = new Event(42, "c", 3.0);
+ Event b2 = new Event(43, "b", 4.0);
+ Event d = new Event(43, "d", 4.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(b1, 2));
+ inputEvents.add(new StreamRecord<>(c, 3));
+ inputEvents.add(new StreamRecord<>(b2, 4));
+ inputEvents.add(new StreamRecord<>(d, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -2568839911852184515L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -3632232424064269636L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 3685596793523534611L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 1960758663575587243L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList());
+ }
+
+ @Test
+ public void testIgnoreStateOfTimesWithNotFollowedBy() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event e = new Event(41, "e", 2.0);
+ Event c1 = new Event(42, "c", 3.0);
+ Event b1 = new Event(43, "b", 4.0);
+ Event c2 = new Event(44, "c", 5.0);
+ Event d1 = new Event(45, "d", 6.0);
+ Event d2 = new Event(46, "d", 7.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(d1, 2));
+ inputEvents.add(new StreamRecord<>(e, 1));
+ inputEvents.add(new StreamRecord<>(b1, 3));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(c2, 4));
+ inputEvents.add(new StreamRecord<>(d2, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 2814850350025111940L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 4988756153568853834L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -225909103322018778L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -924294627956373696L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a1, d1)
+ ));
+ }
+
+ @Test
+ public void testTimesWithNotFollowedByAfter() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event e = new Event(41, "e", 2.0);
+ Event c1 = new Event(42, "c", 3.0);
+ Event b1 = new Event(43, "b", 4.0);
+ Event b2 = new Event(44, "b", 5.0);
+ Event d1 = new Event(46, "d", 7.0);
+ Event d2 = new Event(47, "d", 8.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(d1, 2));
+ inputEvents.add(new StreamRecord<>(e, 1));
+ inputEvents.add(new StreamRecord<>(b1, 3));
+ inputEvents.add(new StreamRecord<>(b2, 3));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(d2, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 6193105689601702341L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5195859580923169111L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 4973027956103783831L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 2724622546678984894L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList());
+ }
+
+ @Test
+ public void testNotFollowedByBeforeOptionalAtTheEnd() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event b1 = new Event(42, "b", 3.0);
+ Event c2 = new Event(43, "c", 4.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(b1, 3));
+ inputEvents.add(new StreamRecord<>(c2, 4));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -4289351792573443294L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -4989574608417523507L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedByAny("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -5940131818629290579L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).optional();
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a1, c1),
+ Lists.newArrayList(a1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByBeforeOptionalTimes() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event b1 = new Event(42, "b", 3.0);
+ Event c2 = new Event(43, "c", 4.0);
+ Event d = new Event(43, "d", 4.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(c1, 2));
+ inputEvents.add(new StreamRecord<>(b1, 3));
+ inputEvents.add(new StreamRecord<>(c2, 4));
+ inputEvents.add(new StreamRecord<>(d, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -7885381452276160322L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 3471511260235826653L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 9073793782452363833L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 7972902718259767076L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a1, c1, c2, d)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByWithBranchingAtStart() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event b1 = new Event(42, "b", 3.0);
+ Event c1 = new Event(41, "c", 2.0);
+ Event a2 = new Event(41, "a", 4.0);
+ Event c2 = new Event(43, "c", 5.0);
+ Event d = new Event(43, "d", 6.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(b1, 2));
+ inputEvents.add(new StreamRecord<>(c1, 3));
+ inputEvents.add(new StreamRecord<>(a2, 4));
+ inputEvents.add(new StreamRecord<>(c2, 5));
+ inputEvents.add(new StreamRecord<>(d, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -7866220136345465444L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 4957837489028234932L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).followedBy("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5569569968862808007L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = -8579678167937416269L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a2, c2, d)
+ ));
+ }
+
+ private static class NotFollowByData {
+ static final Event A_1 = new Event(40, "a", 1.0);
+ static final Event B_1 = new Event(41, "b", 2.0);
+ static final Event B_2 = new Event(42, "b", 3.0);
+ static final Event B_3 = new Event(42, "b", 4.0);
+ static final Event C_1 = new Event(43, "c", 5.0);
+ static final Event B_4 = new Event(42, "b", 6.0);
+ static final Event B_5 = new Event(42, "b", 7.0);
+ static final Event B_6 = new Event(42, "b", 8.0);
+ static final Event D_1 = new Event(43, "d", 9.0);
+
+ private NotFollowByData() {
+ }
+ }
+
+ @Test
+ public void testNotNextAfterOneOrMoreSkipTillNext() {
+ final List<List<Event>> matches = testNotNextAfterOneOrMore(false);
+ assertEquals(0, matches.size());
+ }
+
+ @Test
+ public void testNotNextAfterOneOrMoreSkipTillAny() {
+ final List<List<Event>> matches = testNotNextAfterOneOrMore(true);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_2, NotFollowByData.D_1)
+ ));
+ }
+
+ private List<List<Event>> testNotNextAfterOneOrMore(boolean allMatches) {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ int i = 0;
+ inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_2, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i++));
+
+ Pattern<Event, ?> pattern = Pattern
+ .<Event>begin("a").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ });
+
+ pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*")).where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).oneOrMore()
+ .notNext("not c").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ })
+ .followedBy("d").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ return feedNFA(inputEvents, nfa);
+ }
+
+ @Test
+ public void testNotFollowedByNextAfterOneOrMoreEager() {
+ final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(true, false);
+ assertEquals(0, matches.size());
+ }
+
+ @Test
+ public void testNotFollowedByAnyAfterOneOrMoreEager() {
+ final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(true, true);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_6, NotFollowByData.D_1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByNextAfterOneOrMoreCombinations() {
+ final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(false, false);
+ assertEquals(0, matches.size());
+ }
+
+ @Test
+ public void testNotFollowedByAnyAfterOneOrMoreCombinations() {
+ final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(false, true);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_6, NotFollowByData.D_1)
+ ));
+ }
+
+ private List<List<Event>> testNotFollowedByAfterOneOrMore(boolean eager, boolean allMatches) {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ int i = 0;
+ inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_2, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_3, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_4, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_5, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_6, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i));
+
+ Pattern<Event, ?> pattern = Pattern
+ .<Event>begin("a").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ });
+
+ pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+ .where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ pattern = (eager ? pattern.oneOrMore() : pattern.oneOrMore().allowCombinations())
+ .notFollowedBy("not c").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ })
+ .followedBy("d").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ return feedNFA(inputEvents, nfa);
+ }
+
+ @Test
+ public void testNotFollowedByAnyBeforeOneOrMoreEager() {
+ final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(true, true);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByAnyBeforeOneOrMoreCombinations() {
+ final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(false, true);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByBeforeOneOrMoreEager() {
+ final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(true, false);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByBeforeOneOrMoreCombinations() {
+ final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(false, false);
+
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+ ));
+ }
+
+ private List<List<Event>> testNotFollowedByBeforeOneOrMore(boolean eager, boolean allMatches) {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ int i = 0;
+ inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_4, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_5, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_6, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i));
+
+ Pattern<Event, ?> pattern = Pattern
+ .<Event>begin("a").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ })
+ .notFollowedBy("not c").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ });
+
+ pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+ .where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).oneOrMore();
+
+ pattern = (eager ? pattern : pattern.allowCombinations())
+ .followedBy("d").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ return feedNFA(inputEvents, nfa);
+ }
+
+ @Test
+ public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillNext() {
+ final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(true, false);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillNext() {
+ final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(false, false);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillAny() {
+ final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(true, true);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+ ));
+ }
+
+ @Test
+ public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillAny() {
+ final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(false, true);
+ compareMaps(matches, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1),
+ Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1)
+ ));
+ }
+
+ private List<List<Event>> testNotFollowedByBeforeZeroOrMore(boolean eager, boolean allMatches) {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ int i = 0;
+ inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_4, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_5, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.B_6, i++));
+ inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i));
+
+ Pattern<Event, ?> pattern = Pattern
+ .<Event>begin("a").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ })
+ .notFollowedBy("not c").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ });
+
+ pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+ .where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).oneOrMore().optional();
+
+ pattern = (eager ? pattern : pattern.allowCombinations())
+ .followedBy("d").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("d");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ return feedNFA(inputEvents, nfa);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
new file mode 100644
index 0000000..d378a74
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * Tests for handling Events that are equal in case of {@link Object#equals(Object)} and have same timestamps.
+ */
+@SuppressWarnings("unchecked")
+public class SameElementITCase extends TestLogger {
+
+ @Test
+ public void testEagerZeroOrMoreSameElement() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(end1, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).oneOrMore().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
+ Lists.newArrayList(startEvent, middleEvent1, end1),
+ Lists.newArrayList(startEvent, end1)
+ ));
+ }
+
+ @Test
+ public void testZeroOrMoreSameElement() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent1a = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+ Event middleEvent3a = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 6));
+ inputEvents.add(new StreamRecord<>(middleEvent3a, 6));
+ inputEvents.add(new StreamRecord<>(end1, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).oneOrMore().optional().allowCombinations().followedByAny("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3, middleEvent3a, end1),
+
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3a, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3, middleEvent3a, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent3a, end1),
+ Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3, middleEvent3a, end1),
+
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3a, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3a, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent3a, end1),
+ Lists.newArrayList(startEvent, middleEvent2, middleEvent3, middleEvent3a, end1),
+ Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3, end1),
+ Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3a, end1),
+ Lists.newArrayList(startEvent, middleEvent1a, middleEvent3, middleEvent3a, end1),
+
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent3a, end1),
+ Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, end1),
+ Lists.newArrayList(startEvent, middleEvent1a, middleEvent3, end1),
+ Lists.newArrayList(startEvent, middleEvent1a, middleEvent3a, end1),
+ Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1),
+ Lists.newArrayList(startEvent, middleEvent2, middleEvent3a, end1),
+ Lists.newArrayList(startEvent, middleEvent3, middleEvent3a, end1),
+
+ Lists.newArrayList(startEvent, middleEvent1, end1),
+ Lists.newArrayList(startEvent, middleEvent1a, end1),
+ Lists.newArrayList(startEvent, middleEvent2, end1),
+ Lists.newArrayList(startEvent, middleEvent3, end1),
+ Lists.newArrayList(startEvent, middleEvent3a, end1),
+
+ Lists.newArrayList(startEvent, end1)
+ ));
+ }
+
+ @Test
+ public void testSimplePatternWSameElement() throws Exception {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(end1, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent, middleEvent1, end1),
+ Lists.newArrayList(startEvent, middleEvent1, end1)
+ ));
+ }
+
+ @Test
+ public void testIterativeConditionWSameElement() throws Exception {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent1a = new Event(41, "a", 2.0);
+ Event middleEvent1b = new Event(41, "a", 2.0);
+ final Event end = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent1b, 3));
+ inputEvents.add(new StreamRecord<>(end, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).oneOrMore().optional().allowCombinations().followedBy("end").where(new IterativeCondition<Event>() {
+
+ private static final long serialVersionUID = -5566639743229703237L;
+
+ @Override
+ public boolean filter(Event value, Context<Event> ctx) throws Exception {
+ double sum = 0.0;
+ for (Event event: ctx.getEventsForPattern("middle")) {
+ sum += event.getPrice();
+ }
+ return Double.compare(sum, 4.0) == 0;
+ }
+
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, end),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent1b),
+ Lists.newArrayList(startEvent, middleEvent1a, middleEvent1b, end)
+ ));
+ }
+
+ @Test
+ public void testEndWLoopingWSameElement() throws Exception {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent1a = new Event(41, "a", 2.0);
+ Event middleEvent1b = new Event(41, "a", 2.0);
+ final Event end = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent1b, 3));
+ inputEvents.add(new StreamRecord<>(end, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedByAny("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).oneOrMore().optional();
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent),
+ Lists.newArrayList(startEvent, middleEvent1),
+ Lists.newArrayList(startEvent, middleEvent1a),
+ Lists.newArrayList(startEvent, middleEvent1b),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1a),
+ Lists.newArrayList(startEvent, middleEvent1a, middleEvent1b),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent1b)
+ ));
+ }
+
+ @Test
+ public void testRepeatingPatternWSameElement() throws Exception {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middle1Event1 = new Event(40, "a", 2.0);
+ Event middle1Event2 = new Event(40, "a", 3.0);
+ Event middle1Event3 = new Event(40, "a", 4.0);
+ Event middle2Event1 = new Event(40, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middle1Event1, 3));
+ inputEvents.add(new StreamRecord<>(middle1Event1, 3));
+ inputEvents.add(new StreamRecord<>(middle1Event2, 3));
+ inputEvents.add(new StreamRecord<>(new Event(40, "d", 6.0), 5));
+ inputEvents.add(new StreamRecord<>(middle2Event1, 6));
+ inputEvents.add(new StreamRecord<>(middle1Event3, 7));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).oneOrMore().optional().followedBy("middle2").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ }).optional().followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent, middle1Event1),
+
+ Lists.newArrayList(startEvent, middle1Event1, middle1Event1),
+ Lists.newArrayList(startEvent, middle2Event1, middle1Event3),
+
+ Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2),
+ Lists.newArrayList(startEvent, middle1Event1, middle2Event1, middle1Event3),
+
+ Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle1Event3),
+ Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle2Event1, middle1Event3),
+
+ Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle2Event1, middle1Event3)
+ ));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index bd828b6..44033c1 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -18,20 +18,19 @@
package org.apache.flink.cep.nfa;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.cep.Event;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.util.Collection;
import java.util.Collections;
@@ -39,6 +38,9 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for {@link SharedBuffer}.
+ */
public class SharedBufferTest extends TestLogger {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index 26b8ce9..cd12071 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.cep.pattern.MalformedPatternException;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.util.TestLogger;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -45,6 +46,9 @@ import static com.google.common.collect.Sets.newHashSet;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for {@link NFACompiler}.
+ */
public class NFACompilerTest extends TestLogger {
private static final SimpleCondition<Event> startFilter = new SimpleCondition<Event>() {
@@ -116,7 +120,7 @@ public class NFACompilerTest extends TestLogger {
}
/**
- * Tests that the NFACompiler generates the correct NFA from a given Pattern
+ * Tests that the NFACompiler generates the correct NFA from a given Pattern.
*/
@Test
public void testNFACompilerWithSimplePattern() {
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index 0345192..f5a909b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.cep.operator;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -33,6 +34,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+
import org.junit.Ignore;
import org.junit.Test;
@@ -463,7 +465,6 @@ public class CEPFrom12MigrationTest {
}
}
-
@Test
public void testSinglePatternAfterMigration() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index c92f772..69ba42f 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
import org.junit.Test;
import java.net.URL;
@@ -45,6 +46,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for migration from 1.1.x to 1.3.x.
+ */
public class CEPMigration11to13Test {
private static String getResourceFilename(String filename) {
@@ -198,7 +202,7 @@ public class CEPMigration11to13Test {
final Event startEvent = new Event(42, "start", 1.0);
final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
- final Event endEvent= new Event(42, "end", 1.0);
+ final Event endEvent = new Event(42, "end", 1.0);
// uncomment these lines for regenerating the snapshot on Flink 1.1
/*
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 38ad0f1..d83c191 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.cep.operator;
-import com.google.common.collect.Lists;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -42,13 +41,13 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.types.Either;
import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-import static org.junit.Assert.*;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -57,6 +56,12 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link KeyedCEPPatternOperator} and {@link TimeoutKeyedCEPPatternOperator}.
+ */
public class CEPOperatorTest extends TestLogger {
@Rule
@@ -269,7 +274,7 @@ public class CEPOperatorTest extends TestLogger {
assertTrue(resultObject instanceof StreamRecord);
StreamRecord<Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>> streamRecord =
- (StreamRecord<Either<Tuple2<Map<String,List<Event>>,Long>,Map<String,List<Event>>>>) resultObject;
+ (StreamRecord<Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>>) resultObject;
assertTrue(streamRecord.getValue() instanceof Either.Left);
@@ -299,8 +304,8 @@ public class CEPOperatorTest extends TestLogger {
SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0);
SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0);
- Event endEvent1 = new Event(42, "end", 1.0);
- Event endEvent2 = new Event(42, "end", 2.0);
+ Event endEvent1 = new Event(42, "end", 1.0);
+ Event endEvent2 = new Event(42, "end", 2.0);
Event startEventK2 = new Event(43, "start", 1.0);
@@ -493,8 +498,8 @@ public class CEPOperatorTest extends TestLogger {
SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0);
SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0);
- Event endEvent1 = new Event(42, "end", 1.0);
- Event endEvent2 = new Event(42, "end", 2.0);
+ Event endEvent1 = new Event(42, "end", 1.0);
+ Event endEvent2 = new Event(42, "end", 2.0);
Event startEventK2 = new Event(43, "start", 1.0);
@@ -747,7 +752,6 @@ public class CEPOperatorTest extends TestLogger {
Assert.assertArrayEquals(expected.toArray(), actual.toArray());
}
-
private class ListEventComparator implements Comparator<List<Event>> {
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 86be09c..40514df 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
import org.junit.Test;
import java.util.List;
@@ -44,6 +45,9 @@ import java.util.Queue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for rescaling of CEP operators.
+ */
public class CEPRescalingTest {
@Test
@@ -64,7 +68,7 @@ public class CEPRescalingTest {
Event startEvent1 = new Event(7, "start", 1.0);
SubEvent middleEvent1 = new SubEvent(7, "foo", 1.0, 10.0);
- Event endEvent1= new Event(7, "end", 1.0);
+ Event endEvent1 = new Event(7, "end", 1.0);
int keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(startEvent1), maxParallelism);
assertEquals(1, keygroup);
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index b6fb484..e00384b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -25,15 +25,23 @@ import org.apache.flink.cep.pattern.conditions.OrCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.pattern.conditions.SubtypeCondition;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+/**
+ * Tests for constructing {@link Pattern}.
+ */
public class PatternTest extends TestLogger {
+
/**
- * These test simply test that the pattern construction completes without failure
+ * These test simply test that the pattern construction completes without failure.
*/
-
@Test
public void testStrictContiguity() {
Pattern<Object, ?> pattern = Pattern.begin("start").next("next").next("end");
[3/3] flink git commit: [FLINK-6137] Activate strict checkstyle for
flink-cep
Posted by ch...@apache.org.
[FLINK-6137] Activate strict checkstyle for flink-cep
This closes #3976.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9e574bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9e574bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9e574bf
Branch: refs/heads/master
Commit: c9e574bf3206c16bd7e1be8a5672073d8846d7d7
Parents: 4f50dc4
Author: dawidwys <wy...@gmail.com>
Authored: Wed May 24 22:27:06 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 10:51:38 2017 +0200
----------------------------------------------------------------------
.../flink/cep/scala/pattern/Pattern.scala | 4 +-
flink-libraries/flink-cep/pom.xml | 36 +
.../src/main/java/org/apache/flink/cep/CEP.java | 2 +-
.../flink/cep/NonDuplicatingTypeSerializer.java | 12 +-
.../org/apache/flink/cep/PatternStream.java | 7 +-
.../org/apache/flink/cep/nfa/DeweyNumber.java | 6 +-
.../main/java/org/apache/flink/cep/nfa/NFA.java | 52 +-
.../org/apache/flink/cep/nfa/SharedBuffer.java | 27 +-
.../java/org/apache/flink/cep/nfa/State.java | 12 +-
.../apache/flink/cep/nfa/StateTransition.java | 5 +
.../flink/cep/nfa/compiler/NFACompiler.java | 33 +-
.../AbstractKeyedCEPPatternOperator.java | 9 +-
.../flink/cep/operator/CEPOperatorUtils.java | 8 +-
.../cep/operator/StreamRecordComparator.java | 2 +-
.../flink/cep/pattern/AndFilterFunction.java | 7 +-
.../flink/cep/pattern/OrFilterFunction.java | 7 +-
.../org/apache/flink/cep/pattern/Pattern.java | 22 +-
.../apache/flink/cep/pattern/Quantifier.java | 22 +-
.../cep/pattern/SubtypeFilterFunction.java | 7 +-
.../pattern/conditions/BooleanConditions.java | 1 +
.../pattern/conditions/IterativeCondition.java | 4 +-
.../java/org/apache/flink/cep/CEPITCase.java | 20 +-
.../test/java/org/apache/flink/cep/Event.java | 3 +
.../java/org/apache/flink/cep/SubEvent.java | 3 +
.../apache/flink/cep/nfa/DeweyNumberTest.java | 4 +
.../cep/nfa/IterativeConditionsITCase.java | 421 ++++
.../org/apache/flink/cep/nfa/NFAITCase.java | 1948 +-----------------
.../java/org/apache/flink/cep/nfa/NFATest.java | 8 +-
.../apache/flink/cep/nfa/NFATestUtilities.java | 115 ++
.../apache/flink/cep/nfa/NotPatternITCase.java | 1036 ++++++++++
.../apache/flink/cep/nfa/SameElementITCase.java | 407 ++++
.../apache/flink/cep/nfa/SharedBufferTest.java | 10 +-
.../flink/cep/nfa/compiler/NFACompilerTest.java | 6 +-
.../cep/operator/CEPFrom12MigrationTest.java | 3 +-
.../cep/operator/CEPMigration11to13Test.java | 6 +-
.../flink/cep/operator/CEPOperatorTest.java | 22 +-
.../flink/cep/operator/CEPRescalingTest.java | 6 +-
.../apache/flink/cep/pattern/PatternTest.java | 14 +-
38 files changed, 2296 insertions(+), 2021 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index c77e70d..3a30836 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -295,7 +295,7 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
* {{{A1 A2 B}}} appears, this will generate patterns:
* {{{A1 B}}} and {{{A1 A2 B}}}. See also {{{allowCombinations()}}}.
*
- * @return The same pattern with a [[Quantifier.ONE_OR_MORE()]] quantifier applied.
+ * @return The same pattern with a [[Quantifier.oneOrMore()]] quantifier applied.
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
def oneOrMore: Pattern[T, F] = {
@@ -316,7 +316,7 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
}
/**
- * Applicable only to [[Quantifier.ONE_OR_MORE()]] and [[Quantifier.TIMES()]] patterns,
+ * Applicable only to [[Quantifier.oneOrMore()]] and [[Quantifier.times()]] patterns,
* this option allows more flexibility to the matching events.
*
* If {{{allowCombinations()}}} is not applied for a
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml
index 35045c0..6622b36 100644
--- a/flink-libraries/flink-cep/pom.xml
+++ b/flink-libraries/flink-cep/pom.xml
@@ -106,6 +106,42 @@ under the License.
</execution>
</executions>
</plugin>
+
+ <!-- We have more strict checkstyle rules than the rest of the project -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>6.19</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+ <suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ <logViolationsToConsole>true</logViolationsToConsole>
+ <failOnViolation>true</failOnViolation>
+ </configuration>
+ <executions>
+ <!--
+ Execute checkstyle after compilation but before tests.
+
+ This ensures that any parsing or type checking errors are from
+ javac, so they look as expected. Beyond that, we want to
+ fail as early as possible.
+ -->
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
index 9ce9f77..0ef9c21 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
/**
* Utility class for complex event processing.
*
- * Methods which transform a {@link DataStream} into a {@link PatternStream} to do CEP.
+ * <p>Methods which transform a {@link DataStream} into a {@link PatternStream} to do CEP.
*/
public class CEP {
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
index 9e22fc2..f9e13fe 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
@@ -35,7 +35,7 @@ import java.util.IdentityHashMap;
* serialized once. If the same object shall be serialized again, then a reference handle is
* written instead.
*
- * Avoiding duplication is achieved by keeping an internal identity hash map. This map contains
+ * <p>Avoiding duplication is achieved by keeping an internal identity hash map. This map contains
* all serialized objects. To make the serializer work it is important that the same serializer
* is used for a coherent serialization run. After the serialization has stopped, the identity
* hash map should be cleared.
@@ -107,8 +107,8 @@ public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
/**
* Serializes the given record.
- * <p>
- * First a boolean indicating whether a reference handle (true) or the object (false) is
+ *
+ * <p>First a boolean indicating whether a reference handle (true) or the object (false) is
* written. Then, either the reference handle or the object is written.
*
* @param record The record to serialize.
@@ -128,8 +128,8 @@ public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
/**
* Deserializes an object from the input view.
- * <p>
- * First it reads a boolean indicating whether a reference handle or a serialized object
+ *
+ * <p>First it reads a boolean indicating whether a reference handle or a serialized object
* follows.
*
* @param source The input view from which to read the data.
@@ -172,7 +172,7 @@ public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
public boolean equals(Object obj) {
if (obj instanceof NonDuplicatingTypeSerializer) {
@SuppressWarnings("unchecked")
- NonDuplicatingTypeSerializer<T> other = (NonDuplicatingTypeSerializer<T>)obj;
+ NonDuplicatingTypeSerializer<T> other = (NonDuplicatingTypeSerializer<T>) obj;
return (other.canEqual(this) && typeSerializer.equals(other.typeSerializer));
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 5544689..71614cf 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -40,7 +40,7 @@ import java.util.Map;
* {@link org.apache.flink.cep.nfa.NFA}. In order to process the detected sequences, the user
* has to specify a {@link PatternSelectFunction} or a {@link PatternFlatSelectFunction}.
*
- * Additionally it allows to handle partially matched event patterns which have timed out. For this
+ * <p>Additionally it allows to handle partially matched event patterns which have timed out. For this
* the user has to specify a {@link PatternTimeoutFunction} or a {@link PatternFlatTimeoutFunction}.
*
* @param <T> Type of the events
@@ -119,7 +119,7 @@ public class PatternStream<T> {
* provided {@link PatternSelectFunction} is called. The pattern select function can produce
* exactly one resulting element.
*
- * Applies a timeout function to a partial pattern sequence which has timed out. For each
+ * <p>Applies a timeout function to a partial pattern sequence which has timed out. For each
* partial pattern sequence the provided {@link PatternTimeoutFunction} is called. The pattern
* timeout function can produce exactly one resulting element.
*
@@ -220,7 +220,7 @@ public class PatternStream<T> {
* the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function
* can produce an arbitrary number of resulting elements.
*
- * Applies a timeout function to a partial pattern sequence which has timed out. For each
+ * <p>Applies a timeout function to a partial pattern sequence which has timed out. For each
* partial pattern sequence the provided {@link PatternFlatTimeoutFunction} is called. The
* pattern timeout function can produce an arbitrary number of resulting elements.
*
@@ -397,7 +397,6 @@ public class PatternStream<T> {
this.patternFlatSelectFunction = patternFlatSelectFunction;
}
-
@Override
public void flatMap(Map<String, List<T>> value, Collector<R> out) throws Exception {
patternFlatSelectFunction.flatSelect(value, out);
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index 3827956..f066141 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -31,7 +31,7 @@ import java.util.Arrays;
/**
* Versioning scheme which allows to retrieve dependencies between different versions.
*
- * A dewey number consists of a sequence of digits d1.d2.d3. ... .dn. A dewey number v is compatible
+ * <p>A dewey number consists of a sequence of digits d1.d2.d3. ... .dn. A dewey number v is compatible
* to v' iff v contains v' as a prefix or if both dewey number differ only in the last digit and
* the last digit of v is greater than v'.
*
@@ -58,7 +58,7 @@ public class DeweyNumber implements Serializable {
/**
* Checks whether this dewey number is compatible to the other dewey number.
*
- * True iff this contains other as a prefix or iff they differ only in the last digit whereas
+ * <p>True iff this contains other as a prefix or iff they differ only in the last digit whereas
* the last digit of this is greater than the last digit of other.
*
* @param other The other dewey number to check compatibility against
@@ -106,7 +106,7 @@ public class DeweyNumber implements Serializable {
/**
* Creates a new dewey number from this such that its last digit is increased by the supplied
- * number
+ * number.
*
* @param times how many times to increase the Dewey number
* @return A new dewey number derived from this whose last digit is increased by given number
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 2be09ad..f438915 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -18,9 +18,6 @@
package org.apache.flink.cep.nfa;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.ListMultimap;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
@@ -29,9 +26,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.EnumSerializer;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.cep.NonDuplicatingTypeSerializer;
@@ -44,7 +41,12 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.ListMultimap;
+
import javax.annotation.Nullable;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -68,27 +70,26 @@ import java.util.Stack;
/**
* Non-deterministic finite automaton implementation.
- * <p>
- * The {@link org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator CEP operator}
+ *
+ * <p>The {@link org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator CEP operator}
* keeps one NFA per key, for keyed input streams, and a single global NFA for non-keyed ones.
* When an event gets processed, it updates the NFA's internal state machine.
- * <p>
- * An event that belongs to a partially matched sequence is kept in an internal
+ *
+ * <p>An event that belongs to a partially matched sequence is kept in an internal
* {@link SharedBuffer buffer}, which is a memory-optimized data-structure exactly for
* this purpose. Events in the buffer are removed when all the matched sequences that
* contain them are:
* <ol>
- * <li>emitted (success)</li>
- * <li>discarded (patterns containing NOT)</li>
- * <li>timed-out (windowed patterns)</li>
+ * <li>emitted (success)</li>
+ * <li>discarded (patterns containing NOT)</li>
+ * <li>timed-out (windowed patterns)</li>
* </ol>
*
- * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
- *
- * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">
- * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
+ * <p>The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
*
* @param <T> Type of the processed events
+ * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
*/
public class NFA<T> implements Serializable {
@@ -289,7 +290,7 @@ public class NFA<T> implements Serializable {
}
// prune shared buffer based on window length
- if(windowTime > 0L) {
+ if (windowTime > 0L) {
long pruningTimestamp = timestamp - windowTime;
if (pruningTimestamp < timestamp) {
@@ -360,6 +361,7 @@ public class NFA<T> implements Serializable {
int getTotalIgnoreBranches() {
return totalIgnoreBranches;
}
+
int getTotalTakeBranches() {
return totalTakeBranches;
}
@@ -672,7 +674,7 @@ public class NFA<T> implements Serializable {
////////////////////// Fault-Tolerance / Migration //////////////////////
- private final static String BEGINNING_STATE_NAME = "$beginningState$";
+ private static final String BEGINNING_STATE_NAME = "$beginningState$";
private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
ois.defaultReadObject();
@@ -747,7 +749,6 @@ public class NFA<T> implements Serializable {
}
}).getTargetState().getName();
-
final State<T> previousState = convertedStates.get(previousName);
computationStates.add(ComputationState.createState(
@@ -783,16 +784,16 @@ public class NFA<T> implements Serializable {
@SuppressWarnings("unchecked")
private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException {
- final State<T> state = (State<T>)ois.readObject();
+ final State<T> state = (State<T>) ois.readObject();
State<T> previousState;
try {
- previousState = (State<T>)ois.readObject();
+ previousState = (State<T>) ois.readObject();
} catch (OptionalDataException e) {
previousState = null;
}
final long timestamp = ois.readLong();
- final DeweyNumber version = (DeweyNumber)ois.readObject();
+ final DeweyNumber version = (DeweyNumber) ois.readObject();
final long startTimestamp = ois.readLong();
final boolean hasEvent = ois.readBoolean();
@@ -915,7 +916,7 @@ public class NFA<T> implements Serializable {
serializeStates(record.states, target);
target.writeLong(record.windowTime);
target.writeBoolean(record.handleTimeout);
-
+
sharedBufferSerializer.serialize(record.eventSharedBuffer, target);
target.writeInt(record.computationStates.size());
@@ -948,10 +949,10 @@ public class NFA<T> implements Serializable {
Set<State<T>> states = deserializeStates(source);
long windowTime = source.readLong();
boolean handleTimeout = source.readBoolean();
-
+
NFA<T> nfa = new NFA<>(eventSerializer, windowTime, handleTimeout);
nfa.states = states;
-
+
nfa.eventSharedBuffer = sharedBufferSerializer.deserialize(source);
Queue<ComputationState<T>> computationStates = new LinkedList<>();
@@ -1132,7 +1133,6 @@ public class NFA<T> implements Serializable {
TypeSerializer<State.StateType> stateTypeSerializer = new EnumSerializer<>(State.StateType.class);
TypeSerializer<StateTransitionAction> actionSerializer = new EnumSerializer<>(StateTransitionAction.class);
-
final int noOfStates = in.readInt();
Map<String, State<T>> states = new HashMap<>(noOfStates);
@@ -1250,7 +1250,7 @@ public class NFA<T> implements Serializable {
ois.close();
bais.close();
return copy;
- } catch (IOException|ClassNotFoundException e) {
+ } catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("Could not copy NFA.", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 91fce1f..a44b333 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -18,9 +18,6 @@
package org.apache.flink.cep.nfa;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
@@ -37,6 +34,10 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.util.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.commons.lang3.StringUtils;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@@ -57,15 +58,15 @@ import java.util.Stack;
/**
* A shared buffer implementation which stores values under a key. Additionally, the values can be
* versioned such that it is possible to retrieve their predecessor element in the buffer.
- * <p>
- * The idea of the implementation is to have for each key a dedicated {@link SharedBufferPage}. Each
+ *
+ * <p>The idea of the implementation is to have for each key a dedicated {@link SharedBufferPage}. Each
* buffer page maintains a collection of the inserted values.
*
- * The values are wrapped in a {@link SharedBufferEntry}. The shared buffer entry allows to store
+ * <p>The values are wrapped in a {@link SharedBufferEntry}. The shared buffer entry allows to store
* relations between different entries. A dewey versioning scheme allows to discriminate between
* different relations (e.g. preceding element).
*
- * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
+ * <p>The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
*
* @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">
* https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
@@ -245,7 +246,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
if (currentEntry == null) {
final ListMultimap<K, V> completePath = ArrayListMultimap.create();
- while(!currentPath.isEmpty()) {
+ while (!currentPath.isEmpty()) {
final SharedBufferEntry<K, V> currentPathEntry = currentPath.pop();
completePath.put(currentPathEntry.getKey(), currentPathEntry.getValueTime().getValue());
@@ -398,7 +399,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
public String toString() {
StringBuilder builder = new StringBuilder();
- for(Map.Entry<K, SharedBufferPage<K, V>> entry: pages.entrySet()){
+ for (Map.Entry<K, SharedBufferPage<K, V>> entry : pages.entrySet()) {
builder.append("Key: ").append(entry.getKey()).append("\n");
builder.append("Value: ").append(entry.getValue()).append("\n");
}
@@ -644,7 +645,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
}
/**
- * Versioned edge between two shared buffer entries
+ * Versioned edge between two shared buffer entries.
*
* @param <K> Type of the key
* @param <V> Type of the value
@@ -747,7 +748,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
public boolean equals(Object obj) {
if (obj instanceof ValueTimeWrapper) {
@SuppressWarnings("unchecked")
- ValueTimeWrapper<V> other = (ValueTimeWrapper<V>)obj;
+ ValueTimeWrapper<V> other = (ValueTimeWrapper<V>) obj;
return timestamp == other.getTimestamp() && value.equals(other.getValue()) && counter == other.getCounter();
} else {
@@ -928,7 +929,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
// key for the current page
keySerializer.serialize(page.getKey(), target);
-
+
// number of page entries
target.writeInt(page.entries.size());
@@ -1182,7 +1183,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
for (int i = 0; i < numberPages; i++) {
// key of the page
@SuppressWarnings("unchecked")
- K key = (K)ois.readObject();
+ K key = (K) ois.readObject();
SharedBufferPage<K, V> page = new SharedBufferPage<>(key);
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 14395b1..035674c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -30,8 +30,8 @@ import java.util.Objects;
/**
* Represents a state of the {@link NFA}.
- * <p>
- * Each state is identified by a name and a state type. Furthermore, it contains a collection of
+ *
+ * <p>Each state is identified by a name and a state type. Furthermore, it contains a collection of
* state transitions. The state transitions describe under which conditions it is possible to enter
* a new state.
*
@@ -59,7 +59,9 @@ public class State<T> implements Serializable {
return stateType == StateType.Final;
}
- public boolean isStart() { return stateType == StateType.Start; }
+ public boolean isStart() {
+ return stateType == StateType.Start;
+ }
public String getName() {
return name;
@@ -84,7 +86,7 @@ public class State<T> implements Serializable {
addStateTransition(StateTransitionAction.IGNORE, this, condition);
}
- public void addIgnore(final State<T> targetState,final IterativeCondition<T> condition) {
+ public void addIgnore(final State<T> targetState, final IterativeCondition<T> condition) {
addStateTransition(StateTransitionAction.IGNORE, targetState, condition);
}
@@ -104,7 +106,7 @@ public class State<T> implements Serializable {
public boolean equals(Object obj) {
if (obj instanceof State) {
@SuppressWarnings("unchecked")
- State<T> other = (State<T>)obj;
+ State<T> other = (State<T>) obj;
return name.equals(other.name) &&
stateType == other.stateType &&
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
index c6850cc..bb61e09 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
@@ -25,6 +25,11 @@ import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import java.io.Serializable;
import java.util.Objects;
+/**
+ * Represents a transition from one {@link State} to another.
+ *
+ * @param <T> type of events that are handled by the {@link IterativeCondition}
+ */
public class StateTransition<T> implements Serializable {
private static final long serialVersionUID = -4825345749997891838L;
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 1b31485..8d1d366 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -18,18 +18,6 @@
package org.apache.flink.cep.nfa.compiler;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterators;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -46,6 +34,21 @@ import org.apache.flink.cep.pattern.conditions.NotCondition;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
/**
* Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a
* {@link NFAFactory}.
@@ -396,7 +399,7 @@ public class NFACompiler {
/**
* Creates a "complex" state consisting of given number of states with
- * same {@link IterativeCondition}
+ * same {@link IterativeCondition}.
*
* @param sinkState the state that the created state should point to
* @param times number of times the state should be copied
@@ -720,8 +723,8 @@ public class NFACompiler {
/**
* Implementation of the {@link NFAFactory} interface.
- * <p>
- * The implementation takes the input type serializer, the window time and the set of
+ *
+ * <p>The implementation takes the input type serializer, the window time and the set of
* states and their transitions to be able to create an NFA from them.
*
* @param <T> Type of the input events which are processed by the NFA
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index af4b53e..2e3aefd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -30,9 +30,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.core.fs.FSDataInputStream;
@@ -40,13 +40,13 @@ import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -337,7 +337,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
)
);
-
if (migratingFromOldKeyedOperator) {
int numberEntries = inputView.readInt();
for (int i = 0; i < numberEntries; i++) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index e7b7e65..d00e5e9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -39,6 +40,9 @@ import org.apache.flink.types.Either;
import java.util.List;
import java.util.Map;
+/**
+ * Utility methods for creating {@link PatternStream}.
+ */
public class CEPOperatorUtils {
/**
@@ -61,7 +65,7 @@ public class CEPOperatorUtils {
if (inputStream instanceof KeyedStream) {
// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
- KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) inputStream;
+ KeyedStream<T, K> keyedStream = (KeyedStream<T, K>) inputStream;
TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
@@ -122,7 +126,7 @@ public class CEPOperatorUtils {
if (inputStream instanceof KeyedStream) {
// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
- KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) inputStream;
+ KeyedStream<T, K> keyedStream = (KeyedStream<T, K>) inputStream;
TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
index b290e7b..30fbc26 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
@@ -24,7 +24,7 @@ import java.io.Serializable;
import java.util.Comparator;
/**
- * Compares two {@link StreamRecord}s based on their timestamp
+ * Compares two {@link StreamRecord}s based on their timestamp.
*
* @param <IN> Type of the value field of the StreamRecord
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
index a7391d5..ef3071f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
@@ -21,13 +21,12 @@ package org.apache.flink.cep.pattern;
import org.apache.flink.api.common.functions.FilterFunction;
/**
- * @deprecated This is only used when migrating from an older Flink version.
- * Use the {@link org.apache.flink.cep.pattern.conditions.AndCondition} instead.
- *
- * <p>A filter function which combines two filter functions with a logical and. Thus, the filter
+ * A filter function which combines two filter functions with a logical and. Thus, the filter
* function only returns true, iff both filters return true.
*
* @param <T> Type of the element to filter
+ * @deprecated This is only used when migrating from an older Flink version.
+ * Use the {@link org.apache.flink.cep.pattern.conditions.AndCondition} instead.
*/
@Deprecated
public class AndFilterFunction<T> implements FilterFunction<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
index 3620cae..d1c406a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
@@ -21,13 +21,12 @@ package org.apache.flink.cep.pattern;
import org.apache.flink.api.common.functions.FilterFunction;
/**
- * @deprecated This is only used when migrating from an older Flink version.
- * Use the {@link org.apache.flink.cep.pattern.conditions.OrCondition} instead.
- *
- * <p>A filter function which combines two filter functions with a logical or. Thus, the filter
+ * A filter function which combines two filter functions with a logical or. Thus, the filter
* function only returns true, iff at least one of the filter functions holds true.
*
* @param <T> Type of the element to filter
+ * @deprecated This is only used when migrating from an older Flink version.
+ * Use the {@link org.apache.flink.cep.pattern.conditions.OrCondition} instead.
*/
@Deprecated
public class OrFilterFunction<T> implements FilterFunction<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 2d10b41..2676994 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -30,10 +30,10 @@ import org.apache.flink.util.Preconditions;
/**
* Base class for a pattern definition.
- * <p>
- * A pattern definition is used by {@link org.apache.flink.cep.nfa.compiler.NFACompiler} to create a {@link NFA}.
*
- * <pre>{@code
+ * <p>A pattern definition is used by {@link org.apache.flink.cep.nfa.compiler.NFACompiler} to create a {@link NFA}.
+ *
+ * <p><pre>{@code
* Pattern<T, F> pattern = Pattern.<T>begin("start")
* .next("middle").subtype(F.class)
* .followedBy("end").where(new MyCondition());
@@ -57,8 +57,8 @@ public class Pattern<T, F extends T> {
/** Window length in which the pattern match has to occur. */
private Time windowTime;
- /** A quantifier for the pattern. By default set to {@link Quantifier#ONE(ConsumingStrategy)}. */
- private Quantifier quantifier = Quantifier.ONE(ConsumingStrategy.STRICT);
+ /** A quantifier for the pattern. By default set to {@link Quantifier#one(ConsumingStrategy)}. */
+ private Quantifier quantifier = Quantifier.one(ConsumingStrategy.STRICT);
/**
* Applicable to a {@code times} pattern, and holds
@@ -77,7 +77,7 @@ public class Pattern<T, F extends T> {
final ConsumingStrategy consumingStrategy) {
this.name = name;
this.previous = previous;
- this.quantifier = Quantifier.ONE(consumingStrategy);
+ this.quantifier = Quantifier.one(consumingStrategy);
}
public Pattern<T, ? extends T> getPrevious() {
@@ -295,13 +295,13 @@ public class Pattern<T, F extends T> {
* {@code A1 A2 B} appears, this will generate patterns:
* {@code A1 B} and {@code A1 A2 B}. See also {@link #allowCombinations()}.
*
- * @return The same pattern with a {@link Quantifier#ONE_OR_MORE(ConsumingStrategy)} quantifier applied.
+ * @return The same pattern with a {@link Quantifier#oneOrMore(ConsumingStrategy)} quantifier applied.
* @throws MalformedPatternException if the quantifier is not applicable to this pattern.
*/
public Pattern<T, F> oneOrMore() {
checkIfNoNotPattern();
checkIfQuantifierApplied();
- this.quantifier = Quantifier.ONE_OR_MORE(quantifier.getConsumingStrategy());
+ this.quantifier = Quantifier.oneOrMore(quantifier.getConsumingStrategy());
return this;
}
@@ -317,14 +317,14 @@ public class Pattern<T, F extends T> {
checkIfNoNotPattern();
checkIfQuantifierApplied();
Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
- this.quantifier = Quantifier.TIMES(quantifier.getConsumingStrategy());
+ this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
this.times = times;
return this;
}
/**
- * Applicable only to {@link Quantifier#ONE_OR_MORE(ConsumingStrategy)} and
- * {@link Quantifier#TIMES(ConsumingStrategy)} patterns, this option allows more flexibility to the matching events.
+ * Applicable only to {@link Quantifier#oneOrMore(ConsumingStrategy)} and
+ * {@link Quantifier#times(ConsumingStrategy)} patterns, this option allows more flexibility to the matching events.
*
* <p>If {@code allowCombinations()} is not applied for a
* pattern {@code A.oneOrMore().followedBy(B)} and a sequence of events
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index 382c3ba..efc7cf4 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -15,11 +15,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.cep.pattern;
import java.util.EnumSet;
import java.util.Objects;
+/**
+ * A quantifier describing the Pattern. There are three main groups of {@link Quantifier}.
+ *
+ * <p><ol>
+ * <li>Single</li>
+ * <li>Looping</li>
+ * <li>Times</li>
+ * </ol>
+ *
+ * <p>Each {@link Pattern} can be optional and have a {@link ConsumingStrategy}. Looping and Times also hava an
+ * additional inner consuming strategy that is applied between accepted events in the pattern.
+ */
public class Quantifier {
private final EnumSet<QuantifierProperty> properties;
@@ -36,15 +49,15 @@ public class Quantifier {
this.consumingStrategy = consumingStrategy;
}
- public static Quantifier ONE(final ConsumingStrategy consumingStrategy) {
+ public static Quantifier one(final ConsumingStrategy consumingStrategy) {
return new Quantifier(consumingStrategy, QuantifierProperty.SINGLE);
}
- public static Quantifier ONE_OR_MORE(final ConsumingStrategy consumingStrategy) {
+ public static Quantifier oneOrMore(final ConsumingStrategy consumingStrategy) {
return new Quantifier(consumingStrategy, QuantifierProperty.LOOPING);
}
- public static Quantifier TIMES(final ConsumingStrategy consumingStrategy) {
+ public static Quantifier times(final ConsumingStrategy consumingStrategy) {
return new Quantifier(consumingStrategy, QuantifierProperty.TIMES);
}
@@ -118,6 +131,9 @@ public class Quantifier {
OPTIONAL
}
+ /**
+ * Describes strategy for which events are matched in this {@link Pattern}. See docs for more info.
+ */
public enum ConsumingStrategy {
STRICT,
SKIP_TILL_NEXT,
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
index ae48df3..f5008c1 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
@@ -21,13 +21,12 @@ package org.apache.flink.cep.pattern;
import org.apache.flink.api.common.functions.FilterFunction;
/**
- * @deprecated This is only used when migrating from an older Flink version.
- * Use the {@link org.apache.flink.cep.pattern.conditions.SubtypeCondition} instead.
- *
- * <p>A filter function which filters elements of the given type. A element if filtered out iff it
+ * A filter function which filters elements of the given type. A element if filtered out iff it
* is not assignable to the given subtype of T.
*
* @param <T> Type of the elements to be filtered
+ * @deprecated This is only used when migrating from an older Flink version.
+ * Use the {@link org.apache.flink.cep.pattern.conditions.SubtypeCondition} instead.
*/
@Deprecated
public class SubtypeFilterFunction<T> implements FilterFunction<T> {
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
index d67b407..aea5a3b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.flink.cep.pattern.conditions;
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
index 016cdef..e7c814f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
@@ -65,8 +65,8 @@ public abstract class IterativeCondition<T> implements Function, Serializable {
/**
* The filter function that evaluates the predicate.
- * <p>
- * <strong>IMPORTANT:</strong> The system assumes that the function does not
+ *
+ * <p><strong>IMPORTANT:</strong> The system assumes that the function does not
* modify the elements on which the predicate is applied. Violating this assumption
* can lead to incorrect results.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 9a08659..66eeca8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.cep;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.core.fs.FileSystem;
@@ -31,8 +32,8 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-
import org.apache.flink.types.Either;
+
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -42,6 +43,9 @@ import org.junit.rules.TemporaryFolder;
import java.util.List;
import java.util.Map;
+/**
+ * End to end tests of both CEP operators and {@link NFA}.
+ */
@SuppressWarnings("serial")
public class CEPITCase extends StreamingMultipleProgramsTestBase {
@@ -70,7 +74,8 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
}
/**
- * Checks that a certain event sequence is recognized
+ * Checks that a certain event sequence is recognized.
+ *
* @throws Exception
*/
@Test
@@ -224,7 +229,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
Tuple2.of(new Event(5, "middle", 5.0), 7L),
// last element for high final watermark
Tuple2.of(new Event(5, "middle", 5.0), 100L)
- ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
+ ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
@Override
public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
@@ -307,7 +312,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
Tuple2.of(new Event(1, "middle", 5.0), 7L),
Tuple2.of(new Event(3, "middle", 6.0), 9L),
Tuple2.of(new Event(3, "end", 7.0), 7L)
- ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
+ ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
@Override
public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp) {
@@ -444,7 +449,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
Tuple2.of(new Event(1, "middle", 2.0), 5L),
Tuple2.of(new Event(1, "start", 2.0), 4L),
Tuple2.of(new Event(1, "end", 2.0), 6L)
- ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
+ ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
@Override
public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp) {
@@ -515,7 +520,8 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
}
/**
- * Checks that a certain event sequence is recognized with an OR filter
+ * Checks that a certain event sequence is recognized with an OR filter.
+ *
* @throws Exception
*/
@Test
@@ -580,4 +586,4 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
env.execute();
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java
index efe56b7..ef072ce 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java
@@ -25,6 +25,9 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import java.util.Objects;
+/**
+ * Exemplary event for usage in tests of CEP. See also {@link SubEvent}
+ */
public class Event {
private String name;
private double price;
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
index effb382..cf5dc9d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
@@ -18,6 +18,9 @@
package org.apache.flink.cep;
+/**
+ * A subclass of {@link Event} for usage in tests.
+ */
public class SubEvent extends Event {
private final double volume;
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
index 8bc010a..e28e77d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
@@ -19,12 +19,16 @@
package org.apache.flink.cep.nfa;
import org.apache.flink.util.TestLogger;
+
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+/**
+ * Tests for {@link DeweyNumber}.
+ */
public class DeweyNumberTest extends TestLogger {
@Test
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
new file mode 100644
index 0000000..910907f
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link IterativeCondition} usage.
+ */
+@SuppressWarnings("unchecked")
+public class IterativeConditionsITCase extends TestLogger {
+
+ ////////////////////// Iterative BooleanConditions /////////////////////////
+
+ private final Event startEvent1 = new Event(40, "start", 1.0);
+ private final Event startEvent2 = new Event(40, "start", 2.0);
+ private final Event startEvent3 = new Event(40, "start", 3.0);
+ private final Event startEvent4 = new Event(40, "start", 4.0);
+ private final SubEvent middleEvent1 = new SubEvent(41, "foo1", 1.0, 10);
+ private final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10);
+ private final SubEvent middleEvent3 = new SubEvent(43, "foo3", 3.0, 10);
+ private final SubEvent middleEvent4 = new SubEvent(43, "foo4", 1.0, 10);
+ private final Event nextOne = new Event(44, "next-one", 1.0);
+ private final Event endEvent = new Event(46, "end", 1.0);
+
+ @Test
+ public void testIterativeWithBranchingPatternEager() {
+ List<List<Event>> actual = testIterativeWithBranchingPattern(true);
+
+ compareMaps(actual,
+ Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
+ Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
+ Lists.newArrayList(startEvent1, endEvent, middleEvent1),
+ Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
+ Lists.newArrayList(startEvent2, endEvent, middleEvent3)
+ )
+ );
+ }
+
+ @Test
+ public void testIterativeWithBranchingPatternCombinations() {
+ List<List<Event>> actual = testIterativeWithBranchingPattern(false);
+
+ compareMaps(actual,
+ Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
+ Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
+ Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1),
+ Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
+ Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1),
+ Lists.newArrayList(startEvent1, endEvent, middleEvent1),
+ Lists.newArrayList(startEvent2, endEvent, middleEvent3)
+ )
+ );
+ }
+
+ private List<List<Event>> testIterativeWithBranchingPattern(boolean eager) {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(startEvent1, 1));
+ inputEvents.add(new StreamRecord<Event>(middleEvent1, 2));
+ inputEvents.add(new StreamRecord<Event>(middleEvent2, 3));
+ inputEvents.add(new StreamRecord<>(startEvent2, 4));
+ inputEvents.add(new StreamRecord<Event>(middleEvent3, 5));
+ inputEvents.add(new StreamRecord<Event>(middleEvent4, 5));
+ inputEvents.add(new StreamRecord<>(nextOne, 6));
+ inputEvents.add(new StreamRecord<>(endEvent, 8));
+
+ Pattern<Event, ?> pattern = eager
+ ? Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("start");
+ }
+ })
+ .followedBy("middle").subtype(SubEvent.class).where(new MySubeventIterCondition()).oneOrMore()
+ .followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 7056763917392056548L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("end");
+ }
+ })
+ : Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("start");
+ }
+ })
+ .followedBy("middle").subtype(SubEvent.class).where(new MySubeventIterCondition()).oneOrMore().allowCombinations()
+ .followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 7056763917392056548L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("end");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ return feedNFA(inputEvents, nfa);
+ }
+
+ private static class MySubeventIterCondition extends IterativeCondition<SubEvent> {
+
+ private static final long serialVersionUID = 6215754202506583964L;
+
+ @Override
+ public boolean filter (SubEvent value, Context < SubEvent > ctx) throws Exception {
+ if (!value.getName().startsWith("foo")) {
+ return false;
+ }
+
+ double sum = 0.0;
+ for (Event event : ctx.getEventsForPattern("middle")) {
+ sum += event.getPrice();
+ }
+ sum += value.getPrice();
+ return Double.compare(sum, 5.0) < 0;
+ }
+ }
+
+ @Test
+ public void testIterativeWithLoopingStartingEager() {
+ List<List<Event>> actual = testIterativeWithLoopingStarting(true);
+
+ compareMaps(actual,
+ Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent1, startEvent2, endEvent),
+ Lists.newArrayList(startEvent1, endEvent),
+ Lists.newArrayList(startEvent2, endEvent),
+ Lists.newArrayList(startEvent3, endEvent),
+ Lists.newArrayList(endEvent)
+ )
+ );
+ }
+
+ @Test
+ public void testIterativeWithLoopingStartingCombination() {
+ List<List<Event>> actual = testIterativeWithLoopingStarting(false);
+
+ compareMaps(actual,
+ Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent1, startEvent2, endEvent),
+ Lists.newArrayList(startEvent1, startEvent3, endEvent),
+ Lists.newArrayList(startEvent1, endEvent),
+ Lists.newArrayList(startEvent2, endEvent),
+ Lists.newArrayList(startEvent3, endEvent),
+ Lists.newArrayList(endEvent)
+ )
+ );
+ }
+
+ private List<List<Event>> testIterativeWithLoopingStarting(boolean eager) {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(startEvent1, 1L));
+ inputEvents.add(new StreamRecord<>(startEvent2, 2L));
+ inputEvents.add(new StreamRecord<>(startEvent3, 3L));
+ inputEvents.add(new StreamRecord<>(endEvent, 4L));
+
+ // for now, a pattern inherits its continuity property from the followedBy() or next(), and the default
+ // behavior (which is the one applied in the case that the pattern graph starts with such a pattern)
+ // of a looping pattern is with relaxed continuity (as in followedBy).
+
+ Pattern<Event, ?> pattern = eager
+ ? Pattern.<Event>begin("start").where(new MyEventIterCondition()).oneOrMore().optional()
+ .followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 7056763917392056548L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("end");
+ }
+ })
+ : Pattern.<Event>begin("start").where(new MyEventIterCondition()).oneOrMore().allowCombinations().optional()
+ .followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 7056763917392056548L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("end");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ return feedNFA(inputEvents, nfa);
+ }
+
+ private static class MyEventIterCondition extends IterativeCondition<Event> {
+
+ private static final long serialVersionUID = 6215754202506583964L;
+
+ @Override
+ public boolean filter(Event value, Context<Event> ctx) throws Exception {
+ if (!value.getName().equals("start")) {
+ return false;
+ }
+
+ double sum = 0.0;
+ for (Event event : ctx.getEventsForPattern("start")) {
+ sum += event.getPrice();
+ }
+ sum += value.getPrice();
+ return Double.compare(sum, 5.0) < 0;
+ }
+ }
+
+ @Test
+ public void testIterativeWithPrevPatternDependency() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(startEvent1, 1L));
+ inputEvents.add(new StreamRecord<>(startEvent2, 2L));
+ inputEvents.add(new StreamRecord<>(endEvent, 4L));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 6215754202506583964L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("start");
+ }
+ }).oneOrMore().followedBy("end").where(new IterativeCondition<Event>() {
+ private static final long serialVersionUID = 7056763917392056548L;
+
+ @Override
+ public boolean filter(Event value, Context<Event> ctx) throws Exception {
+ if (!value.getName().equals("end")) {
+ return false;
+ }
+
+ double sum = 0.0;
+ for (Event event : ctx.getEventsForPattern("start")) {
+ sum += event.getPrice();
+ }
+ return Double.compare(sum, 2.0) >= 0;
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns,
+ Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent1, startEvent2, endEvent),
+ Lists.newArrayList(startEvent2, endEvent)
+ )
+ );
+ }
+
+ @Test
+ public void testIterativeWithABACPattern() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(startEvent1, 1L)); //1
+ inputEvents.add(new StreamRecord<Event>(middleEvent1, 2L)); //1
+
+ inputEvents.add(new StreamRecord<>(startEvent2, 2L)); //2
+ inputEvents.add(new StreamRecord<>(startEvent3, 2L)); //3
+ inputEvents.add(new StreamRecord<Event>(middleEvent2, 2L)); //2
+
+ inputEvents.add(new StreamRecord<>(startEvent4, 2L)); //4
+ inputEvents.add(new StreamRecord<Event>(middleEvent3, 2L)); //3
+ inputEvents.add(new StreamRecord<Event>(middleEvent4, 2L)); //1
+ inputEvents.add(new StreamRecord<>(endEvent, 4L));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 6215754202506583964L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("start");
+ }
+ }).followedByAny("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+ private static final long serialVersionUID = 2178338526904474690L;
+
+ @Override
+ public boolean filter(SubEvent value) throws Exception {
+ return value.getName().startsWith("foo");
+ }
+ }).followedBy("middle2").where(new IterativeCondition<Event>() {
+ private static final long serialVersionUID = -1223388426808292695L;
+
+ @Override
+ public boolean filter(Event value, Context<Event> ctx) throws Exception {
+ if (!value.getName().equals("start")) {
+ return false;
+ }
+
+ double sum = 0.0;
+ for (Event e: ctx.getEventsForPattern("middle2")) {
+ sum += e.getPrice();
+ }
+ sum += value.getPrice();
+ return Double.compare(sum, 5.0) <= 0;
+ }
+ }).oneOrMore().followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 562590474115118323L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("end");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns,
+ Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent1, startEvent2, startEvent3, middleEvent1, endEvent),
+ Lists.newArrayList(startEvent1, middleEvent1, startEvent2, endEvent),
+ Lists.newArrayList(startEvent1, middleEvent2, startEvent4, endEvent),
+ Lists.newArrayList(startEvent2, middleEvent2, startEvent4, endEvent),
+ Lists.newArrayList(startEvent3, middleEvent2, startEvent4, endEvent)
+ )
+ );
+ }
+
+ @Test
+ public void testIterativeWithPrevPatternDependencyAfterBranching() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ inputEvents.add(new StreamRecord<>(startEvent1, 1L));
+ inputEvents.add(new StreamRecord<>(startEvent2, 2L));
+ inputEvents.add(new StreamRecord<Event>(middleEvent1, 4L));
+ inputEvents.add(new StreamRecord<>(startEvent3, 5L));
+ inputEvents.add(new StreamRecord<Event>(middleEvent2, 6L));
+ inputEvents.add(new StreamRecord<>(endEvent, 7L));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 6215754202506583964L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("start");
+ }
+ }).oneOrMore().followedByAny("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+ private static final long serialVersionUID = 2178338526904474690L;
+
+ @Override
+ public boolean filter(SubEvent value) throws Exception {
+ return value.getName().startsWith("foo");
+ }
+ }).followedByAny("end").where(new IterativeCondition<Event>() {
+ private static final long serialVersionUID = 7056763917392056548L;
+
+ @Override
+ public boolean filter(Event value, Context<Event> ctx) throws Exception {
+ if (!value.getName().equals("end")) {
+ return false;
+ }
+
+ double sum = 0.0;
+ for (Event event : ctx.getEventsForPattern("start")) {
+ sum += event.getPrice();
+ }
+ return Double.compare(sum, 2.0) >= 0;
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns,
+ Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent1, startEvent2, middleEvent1, endEvent),
+ Lists.newArrayList(startEvent2, middleEvent1, endEvent),
+ Lists.newArrayList(startEvent1, startEvent2, middleEvent2, endEvent),
+ Lists.newArrayList(startEvent1, startEvent2, startEvent3, middleEvent2, endEvent),
+ Lists.newArrayList(startEvent2, startEvent3, middleEvent2, endEvent),
+ Lists.newArrayList(startEvent2, middleEvent2, endEvent),
+ Lists.newArrayList(startEvent3, middleEvent2, endEvent)
+ )
+ );
+ }
+}
[2/3] flink git commit: [FLINK-6137] Activate strict checkstyle for
flink-cep
Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index d00bbb7..92b49d3 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -18,34 +18,37 @@
package org.apache.flink.cep.nfa;
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Doubles;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.Quantifier;
-import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
+
+import com.google.common.collect.Lists;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
import static org.junit.Assert.assertEquals;
+/**
+ * General tests for {@link NFA} features. See also {@link IterativeConditionsITCase}, {@link NotPatternITCase},
+ * {@link SameElementITCase} for more specific tests.
+ */
@SuppressWarnings("unchecked")
public class NFAITCase extends TestLogger {
@@ -234,7 +237,7 @@ public class NFAITCase extends TestLogger {
/**
* Tests that the NFA successfully filters out expired elements with respect to the window
- * length
+ * length.
*/
@Test
public void testSimplePatternWithTimeWindowNFA() {
@@ -251,7 +254,6 @@ public class NFAITCase extends TestLogger {
events.add(new StreamRecord<>(endEvent = new Event(5, "end", 1.0), 11));
events.add(new StreamRecord<>(new Event(6, "end", 1.0), 13));
-
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 7907391379273505897L;
@@ -373,7 +375,7 @@ public class NFAITCase extends TestLogger {
SubEvent middleEvent3 = new SubEvent(43, "foo3", 1.0, 10.0);
SubEvent nextOne1 = new SubEvent(44, "next-one", 1.0, 2.0);
SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0);
- Event endEvent= new Event(46, "end", 1.0);
+ Event endEvent = new Event(46, "end", 1.0);
inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<Event>(middleEvent1, 3));
@@ -1418,7 +1420,7 @@ public class NFAITCase extends TestLogger {
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent, end1),
+ Lists.newArrayList(startEvent, end1),
Lists.newArrayList(end1)
));
}
@@ -1458,9 +1460,9 @@ public class NFAITCase extends TestLogger {
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2),
- Lists.newArrayList(startEvent, middleEvent1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2),
+ Lists.newArrayList(startEvent, middleEvent1),
Lists.newArrayList(startEvent)
));
}
@@ -1499,10 +1501,10 @@ public class NFAITCase extends TestLogger {
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(middleEvent1, middleEvent2, middleEvent3),
- Lists.newArrayList(middleEvent1, middleEvent2),
+ Lists.newArrayList(middleEvent1, middleEvent2, middleEvent3),
+ Lists.newArrayList(middleEvent1, middleEvent2),
Lists.newArrayList(middleEvent1),
- Lists.newArrayList(middleEvent2, middleEvent3),
+ Lists.newArrayList(middleEvent2, middleEvent3),
Lists.newArrayList(middleEvent2),
Lists.newArrayList(middleEvent3)
));
@@ -1539,7 +1541,7 @@ public class NFAITCase extends TestLogger {
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent, middleEvent1),
+ Lists.newArrayList(startEvent, middleEvent1),
Lists.newArrayList(startEvent)
));
}
@@ -1579,9 +1581,9 @@ public class NFAITCase extends TestLogger {
final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2),
- Lists.newArrayList(startEvent, middleEvent1)
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2),
+ Lists.newArrayList(startEvent, middleEvent1)
));
}
@@ -1905,12 +1907,12 @@ public class NFAITCase extends TestLogger {
/////////////////////////////// Consecutive ////////////////////////////////////////
private static class ConsecutiveData {
- static final Event startEvent = new Event(40, "c", 1.0);
- static final Event middleEvent1 = new Event(41, "a", 2.0);
- static final Event middleEvent2 = new Event(42, "a", 3.0);
- static final Event middleEvent3 = new Event(43, "a", 4.0);
- static final Event middleEvent4 = new Event(43, "a", 5.0);
- static final Event end = new Event(44, "b", 5.0);
+ private static final Event startEvent = new Event(40, "c", 1.0);
+ private static final Event middleEvent1 = new Event(41, "a", 2.0);
+ private static final Event middleEvent2 = new Event(42, "a", 3.0);
+ private static final Event middleEvent3 = new Event(43, "a", 4.0);
+ private static final Event middleEvent4 = new Event(43, "a", 5.0);
+ private static final Event end = new Event(44, "b", 5.0);
private ConsecutiveData() {
}
@@ -2374,7 +2376,6 @@ public class NFAITCase extends TestLogger {
assertEquals(true, nfa.isEmpty());
}
-
@Test
public void testZeroOrMoreClearingBuffer() {
Event startEvent = new Event(40, "c", 1.0);
@@ -2418,385 +2419,6 @@ public class NFAITCase extends TestLogger {
assertEquals(true, nfa.isEmpty());
}
-
- ////////////////////// Iterative BooleanConditions /////////////////////////
-
- private final Event startEvent1 = new Event(40, "start", 1.0);
- private final Event startEvent2 = new Event(40, "start", 2.0);
- private final Event startEvent3 = new Event(40, "start", 3.0);
- private final Event startEvent4 = new Event(40, "start", 4.0);
- private final SubEvent middleEvent1 = new SubEvent(41, "foo1", 1.0, 10);
- private final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10);
- private final SubEvent middleEvent3 = new SubEvent(43, "foo3", 3.0, 10);
- private final SubEvent middleEvent4 = new SubEvent(43, "foo4", 1.0, 10);
- private final Event nextOne = new Event(44, "next-one", 1.0);
- private final Event endEvent = new Event(46, "end", 1.0);
-
- @Test
- public void testIterativeWithBranchingPatternEager() {
- List<List<Event>> actual = testIterativeWithBranchingPattern(true);
-
- compareMaps(actual,
- Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
- Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
- Lists.newArrayList(startEvent1, endEvent, middleEvent1),
- Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
- Lists.newArrayList(startEvent2, endEvent, middleEvent3)
- )
- );
- }
-
- @Test
- public void testIterativeWithBranchingPatternCombinations() {
- List<List<Event>> actual = testIterativeWithBranchingPattern(false);
-
- compareMaps(actual,
- Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
- Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
- Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1),
- Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
- Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1),
- Lists.newArrayList(startEvent1, endEvent, middleEvent1),
- Lists.newArrayList(startEvent2, endEvent, middleEvent3)
- )
- );
- }
-
- private List<List<Event>> testIterativeWithBranchingPattern(boolean eager) {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- inputEvents.add(new StreamRecord<>(startEvent1, 1));
- inputEvents.add(new StreamRecord<Event>(middleEvent1, 2));
- inputEvents.add(new StreamRecord<Event>(middleEvent2, 3));
- inputEvents.add(new StreamRecord<>(startEvent2, 4));
- inputEvents.add(new StreamRecord<Event>(middleEvent3, 5));
- inputEvents.add(new StreamRecord<Event>(middleEvent4, 5));
- inputEvents.add(new StreamRecord<>(nextOne, 6));
- inputEvents.add(new StreamRecord<>(endEvent, 8));
-
- Pattern<Event, ?> pattern = eager
- ? Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("start");
- }
- })
- .followedBy("middle").subtype(SubEvent.class).where(new MySubeventIterCondition()).oneOrMore()
- .followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 7056763917392056548L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("end");
- }
- })
- : Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("start");
- }
- })
- .followedBy("middle").subtype(SubEvent.class).where(new MySubeventIterCondition()).oneOrMore().allowCombinations()
- .followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 7056763917392056548L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("end");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- return feedNFA(inputEvents, nfa);
- }
-
- private static class MySubeventIterCondition extends IterativeCondition<SubEvent> {
-
- private static final long serialVersionUID = 6215754202506583964L;
-
- @Override
- public boolean filter (SubEvent value, Context < SubEvent > ctx) throws Exception {
- if (!value.getName().startsWith("foo")) {
- return false;
- }
-
- double sum = 0.0;
- for (Event event : ctx.getEventsForPattern("middle")) {
- sum += event.getPrice();
- }
- sum += value.getPrice();
- return Double.compare(sum, 5.0) < 0;
- }
- }
-
- @Test
- public void testIterativeWithLoopingStartingEager() {
- List<List<Event>> actual = testIterativeWithLoopingStarting(true);
-
- compareMaps(actual,
- Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent1, startEvent2, endEvent),
- Lists.newArrayList(startEvent1, endEvent),
- Lists.newArrayList(startEvent2, endEvent),
- Lists.newArrayList(startEvent3, endEvent),
- Lists.newArrayList(endEvent)
- )
- );
- }
-
- @Test
- public void testIterativeWithLoopingStartingCombination() {
- List<List<Event>> actual = testIterativeWithLoopingStarting(false);
-
- compareMaps(actual,
- Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent1, startEvent2, endEvent),
- Lists.newArrayList(startEvent1, startEvent3, endEvent),
- Lists.newArrayList(startEvent1, endEvent),
- Lists.newArrayList(startEvent2, endEvent),
- Lists.newArrayList(startEvent3, endEvent),
- Lists.newArrayList(endEvent)
- )
- );
- }
-
- private List<List<Event>> testIterativeWithLoopingStarting(boolean eager) {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- inputEvents.add(new StreamRecord<>(startEvent1, 1L));
- inputEvents.add(new StreamRecord<>(startEvent2, 2L));
- inputEvents.add(new StreamRecord<>(startEvent3, 3L));
- inputEvents.add(new StreamRecord<>(endEvent, 4L));
-
- // for now, a pattern inherits its continuity property from the followedBy() or next(), and the default
- // behavior (which is the one applied in the case that the pattern graph starts with such a pattern)
- // of a looping pattern is with relaxed continuity (as in followedBy).
-
- Pattern<Event, ?> pattern = eager
- ? Pattern.<Event>begin("start").where(new MyEventIterCondition()).oneOrMore().optional()
- .followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 7056763917392056548L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("end");
- }
- })
- : Pattern.<Event>begin("start").where(new MyEventIterCondition()).oneOrMore().allowCombinations().optional()
- .followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 7056763917392056548L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("end");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- return feedNFA(inputEvents, nfa);
- }
-
- private static class MyEventIterCondition extends IterativeCondition<Event> {
-
- private static final long serialVersionUID = 6215754202506583964L;
-
- @Override
- public boolean filter(Event value, Context<Event> ctx) throws Exception {
- if (!value.getName().equals("start")) {
- return false;
- }
-
- double sum = 0.0;
- for (Event event : ctx.getEventsForPattern("start")) {
- sum += event.getPrice();
- }
- sum += value.getPrice();
- return Double.compare(sum, 5.0) < 0;
- }
- }
-
- @Test
- public void testIterativeWithPrevPatternDependency() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- inputEvents.add(new StreamRecord<>(startEvent1, 1L));
- inputEvents.add(new StreamRecord<>(startEvent2, 2L));
- inputEvents.add(new StreamRecord<>(endEvent, 4L));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 6215754202506583964L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("start");
- }
- }).oneOrMore().followedBy("end").where(new IterativeCondition<Event>() {
- private static final long serialVersionUID = 7056763917392056548L;
-
- @Override
- public boolean filter(Event value, Context<Event> ctx) throws Exception {
- if (!value.getName().equals("end")) {
- return false;
- }
-
- double sum = 0.0;
- for (Event event : ctx.getEventsForPattern("start")) {
- sum += event.getPrice();
- }
- return Double.compare(sum, 2.0) >= 0;
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
- compareMaps(resultingPatterns,
- Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent1, startEvent2, endEvent),
- Lists.newArrayList(startEvent2, endEvent)
- )
- );
- }
-
- @Test
- public void testIterativeWithABACPattern() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- inputEvents.add(new StreamRecord<>(startEvent1, 1L)); //1
- inputEvents.add(new StreamRecord<Event>(middleEvent1, 2L)); //1
-
- inputEvents.add(new StreamRecord<>(startEvent2, 2L)); //2
- inputEvents.add(new StreamRecord<>(startEvent3, 2L)); //3
- inputEvents.add(new StreamRecord<Event>(middleEvent2, 2L)); //2
-
- inputEvents.add(new StreamRecord<>(startEvent4, 2L)); //4
- inputEvents.add(new StreamRecord<Event>(middleEvent3, 2L)); //3
- inputEvents.add(new StreamRecord<Event>(middleEvent4, 2L)); //1
- inputEvents.add(new StreamRecord<>(endEvent, 4L));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 6215754202506583964L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("start");
- }
- }).followedByAny("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
- private static final long serialVersionUID = 2178338526904474690L;
-
- @Override
- public boolean filter(SubEvent value) throws Exception {
- return value.getName().startsWith("foo");
- }
- }).followedBy("middle2").where(new IterativeCondition<Event>() {
- private static final long serialVersionUID = -1223388426808292695L;
-
- @Override
- public boolean filter(Event value, Context<Event> ctx) throws Exception {
- if (!value.getName().equals("start")) {
- return false;
- }
-
- double sum = 0.0;
- for (Event e: ctx.getEventsForPattern("middle2")) {
- sum += e.getPrice();
- }
- sum += value.getPrice();
- return Double.compare(sum, 5.0) <= 0;
- }
- }).oneOrMore().followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 562590474115118323L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("end");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
- compareMaps(resultingPatterns,
- Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent1, startEvent2, startEvent3, middleEvent1, endEvent),
- Lists.newArrayList(startEvent1, middleEvent1, startEvent2, endEvent),
- Lists.newArrayList(startEvent1, middleEvent2, startEvent4, endEvent),
- Lists.newArrayList(startEvent2, middleEvent2, startEvent4, endEvent),
- Lists.newArrayList(startEvent3, middleEvent2, startEvent4, endEvent)
- )
- );
- }
-
- @Test
- public void testIterativeWithPrevPatternDependencyAfterBranching() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- inputEvents.add(new StreamRecord<>(startEvent1, 1L));
- inputEvents.add(new StreamRecord<>(startEvent2, 2L));
- inputEvents.add(new StreamRecord<Event>(middleEvent1, 4L));
- inputEvents.add(new StreamRecord<>(startEvent3, 5L));
- inputEvents.add(new StreamRecord<Event>(middleEvent2, 6L));
- inputEvents.add(new StreamRecord<>(endEvent, 7L));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 6215754202506583964L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("start");
- }
- }).oneOrMore().followedByAny("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
- private static final long serialVersionUID = 2178338526904474690L;
-
- @Override
- public boolean filter(SubEvent value) throws Exception {
- return value.getName().startsWith("foo");
- }
- }).followedByAny("end").where(new IterativeCondition<Event>() {
- private static final long serialVersionUID = 7056763917392056548L;
-
- @Override
- public boolean filter(Event value, Context<Event> ctx) throws Exception {
- if (!value.getName().equals("end")) {
- return false;
- }
-
- double sum = 0.0;
- for (Event event : ctx.getEventsForPattern("start")) {
- sum += event.getPrice();
- }
- return Double.compare(sum, 2.0) >= 0;
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
- compareMaps(resultingPatterns,
- Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent1, startEvent2, middleEvent1, endEvent),
- Lists.newArrayList(startEvent2, middleEvent1, endEvent),
- Lists.newArrayList(startEvent1, startEvent2, middleEvent2, endEvent),
- Lists.newArrayList(startEvent1, startEvent2, startEvent3, middleEvent2, endEvent),
- Lists.newArrayList(startEvent2, startEvent3, middleEvent2, endEvent),
- Lists.newArrayList(startEvent2, middleEvent2, endEvent),
- Lists.newArrayList(startEvent3, middleEvent2, endEvent)
- )
- );
- }
-
-
/////////////////////////////////////// Skip till next /////////////////////////////
@Test
@@ -2809,7 +2431,7 @@ public class NFAITCase extends TestLogger {
SubEvent middleEvent3 = new SubEvent(43, "foo3", 1.0, 10.0);
SubEvent nextOne1 = new SubEvent(44, "next-one", 1.0, 2.0);
SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0);
- Event endEvent= new Event(46, "end", 1.0);
+ Event endEvent = new Event(46, "end", 1.0);
inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<Event>(middleEvent1, 3));
@@ -2868,7 +2490,7 @@ public class NFAITCase extends TestLogger {
SubEvent middleEvent3 = new SubEvent(43, "foo3", 1.0, 10.0);
SubEvent nextOne1 = new SubEvent(44, "next-one", 1.0, 2.0);
SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0);
- Event endEvent= new Event(46, "end", 1.0);
+ Event endEvent = new Event(46, "end", 1.0);
inputEvents.add(new StreamRecord<>(startEvent, 1));
inputEvents.add(new StreamRecord<Event>(middleEvent1, 3));
@@ -2919,1130 +2541,76 @@ public class NFAITCase extends TestLogger {
));
}
-
- ///////////////////////////////////////// Not pattern /////////////////////////////////////////////////
-
@Test
- public void testNotNext() {
+ public void testMultipleTakesVersionCollision() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
- Event a1 = new Event(40, "a", 1.0);
- Event c1 = new Event(41, "c", 2.0);
- Event b1 = new Event(42, "b", 3.0);
- Event c2 = new Event(43, "c", 4.0);
- Event d = new Event(43, "d", 4.0);
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(41, "a", 3.0);
+ Event middleEvent3 = new Event(41, "a", 4.0);
+ Event middleEvent4 = new Event(41, "a", 5.0);
+ Event middleEvent5 = new Event(41, "a", 6.0);
+ Event end = new Event(44, "b", 5.0);
- inputEvents.add(new StreamRecord<>(a1, 1));
- inputEvents.add(new StreamRecord<>(c1, 2));
- inputEvents.add(new StreamRecord<>(b1, 3));
- inputEvents.add(new StreamRecord<>(c2, 4));
- inputEvents.add(new StreamRecord<>(d, 5));
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+ inputEvents.add(new StreamRecord<>(middleEvent4, 6));
+ inputEvents.add(new StreamRecord<>(middleEvent5, 7));
+ inputEvents.add(new StreamRecord<>(end, 10));
Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5167288560432018992L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).notNext("notPattern").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 2242479288129905510L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- }).followedByAny("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 1404509325548220892L;
+ private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("c");
}
- }).followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -8907427230007830915L;
+ }).followedBy("middle1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
- return value.getName().equals("d");
+ return value.getName().equals("a");
}
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
- compareMaps(matches, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(a1, c1, d),
- Lists.newArrayList(a1, c2, d)
- ));
- }
-
- @Test
- public void testNotNextNoMatches() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event a1 = new Event(40, "a", 1.0);
- Event b1 = new Event(42, "b", 3.0);
- Event c1 = new Event(41, "c", 2.0);
- Event c2 = new Event(43, "c", 4.0);
- Event d = new Event(43, "d", 4.0);
-
- inputEvents.add(new StreamRecord<>(a1, 1));
- inputEvents.add(new StreamRecord<>(b1, 2));
- inputEvents.add(new StreamRecord<>(c1, 3));
- inputEvents.add(new StreamRecord<>(c2, 4));
- inputEvents.add(new StreamRecord<>(d, 5));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -339500190577666439L;
+ }).oneOrMore().allowCombinations().followedBy("middle2").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("a");
}
- }).notNext("notPattern").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -6913980632538046451L;
+ }).oneOrMore().allowCombinations().followedBy("end").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("b");
}
- }).followedBy("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 3332196998905139891L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 2086563479959018387L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("d");
- }
});
NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
- final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
- assertEquals(0, matches.size());
- }
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
- @Test
- public void testNotNextNoMatchesAtTheEnd() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
- Event a1 = new Event(40, "a", 1.0);
- Event c1 = new Event(41, "c", 2.0);
- Event c2 = new Event(43, "c", 4.0);
- Event d = new Event(43, "d", 4.0);
- Event b1 = new Event(42, "b", 3.0);
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
- inputEvents.add(new StreamRecord<>(a1, 1));
- inputEvents.add(new StreamRecord<>(c1, 2));
- inputEvents.add(new StreamRecord<>(c2, 3));
- inputEvents.add(new StreamRecord<>(d, 4));
- inputEvents.add(new StreamRecord<>(b1, 5));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 1672995058886176627L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).followedByAny("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 6003621617520261554L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).followedByAny("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 887700237024758417L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("d");
- }
- }).notNext("notPattern").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5239529076086933032L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
- assertEquals(0, matches.size());
- }
-
- @Test
- public void testNotFollowedBy() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event a1 = new Event(40, "a", 1.0);
- Event c1 = new Event(41, "c", 2.0);
- Event b1 = new Event(42, "b", 3.0);
- Event c2 = new Event(43, "c", 4.0);
- Event d = new Event(43, "d", 4.0);
-
- inputEvents.add(new StreamRecord<>(a1, 1));
- inputEvents.add(new StreamRecord<>(c1, 2));
- inputEvents.add(new StreamRecord<>(b1, 3));
- inputEvents.add(new StreamRecord<>(c2, 4));
- inputEvents.add(new StreamRecord<>(d, 5));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -2641662468313191976L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -3632144132379494778L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- }).followedByAny("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 3818766882138348167L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 2033204730795451288L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("d");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
- compareMaps(matches,Lists.<List<Event>>newArrayList(
- Lists.newArrayList(a1, c1, d)
- ));
- }
-
- @Test
- public void testNotFollowedByBeforeOptional() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event a1 = new Event(40, "a", 1.0);
- Event c1 = new Event(41, "c", 2.0);
- Event b1 = new Event(42, "b", 3.0);
- Event c2 = new Event(43, "c", 4.0);
- Event d = new Event(43, "d", 4.0);
-
- inputEvents.add(new StreamRecord<>(a1, 1));
- inputEvents.add(new StreamRecord<>(c1, 2));
- inputEvents.add(new StreamRecord<>(b1, 3));
- inputEvents.add(new StreamRecord<>(c2, 4));
- inputEvents.add(new StreamRecord<>(d, 5));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -2454396370205097543L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 2749547391611263290L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- }).followedByAny("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -4989511337298217255L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).optional().followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -8466223836652936608L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("d");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
- compareMaps(matches,Lists.<List<Event>>newArrayList(
- Lists.newArrayList(a1, c1, d)
- ));
- }
-
- @Test
- public void testTimesWithNotFollowedBy() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event a1 = new Event(40, "a", 1.0);
- Event b1 = new Event(41, "b", 2.0);
- Event c = new Event(42, "c", 3.0);
- Event b2 = new Event(43, "b", 4.0);
- Event d = new Event(43, "d", 4.0);
-
- inputEvents.add(new StreamRecord<>(a1, 1));
- inputEvents.add(new StreamRecord<>(b1, 2));
- inputEvents.add(new StreamRecord<>(c, 3));
- inputEvents.add(new StreamRecord<>(b2, 4));
- inputEvents.add(new StreamRecord<>(d, 5));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -2568839911852184515L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).followedByAny("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -3632232424064269636L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- }).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 3685596793523534611L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 1960758663575587243L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("d");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
- compareMaps(matches,Lists.<List<Event>>newArrayList());
- }
-
- @Test
- public void testIgnoreStateOfTimesWithNotFollowedBy() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event a1 = new Event(40, "a", 1.0);
- Event e = new Event(41, "e", 2.0);
- Event c1 = new Event(42, "c", 3.0);
- Event b1 = new Event(43, "b", 4.0);
- Event c2 = new Event(44, "c", 5.0);
- Event d1 = new Event(45, "d", 6.0);
- Event d2 = new Event(46, "d", 7.0);
-
- inputEvents.add(new StreamRecord<>(a1, 1));
- inputEvents.add(new StreamRecord<>(d1, 2));
- inputEvents.add(new StreamRecord<>(e, 1));
- inputEvents.add(new StreamRecord<>(b1, 3));
- inputEvents.add(new StreamRecord<>(c1, 2));
- inputEvents.add(new StreamRecord<>(c2, 4));
- inputEvents.add(new StreamRecord<>(d2, 5));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 2814850350025111940L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 4988756153568853834L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- }).followedByAny("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -225909103322018778L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -924294627956373696L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("d");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
- compareMaps(matches, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(a1, d1)
- ));
- }
-
- @Test
- public void testTimesWithNotFollowedByAfter() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event a1 = new Event(40, "a", 1.0);
- Event e = new Event(41, "e", 2.0);
- Event c1 = new Event(42, "c", 3.0);
- Event b1 = new Event(43, "b", 4.0);
- Event b2 = new Event(44, "b", 5.0);
- Event d1 = new Event(46, "d", 7.0);
- Event d2 = new Event(47, "d", 8.0);
-
- inputEvents.add(new StreamRecord<>(a1, 1));
- inputEvents.add(new StreamRecord<>(d1, 2));
- inputEvents.add(new StreamRecord<>(e, 1));
- inputEvents.add(new StreamRecord<>(b1, 3));
- inputEvents.add(new StreamRecord<>(b2, 3));
- inputEvents.add(new StreamRecord<>(c1, 2));
- inputEvents.add(new StreamRecord<>(d2, 5));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 6193105689601702341L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).followedByAny("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5195859580923169111L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- }).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 4973027956103783831L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 2724622546678984894L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("d");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
- compareMaps(matches, Lists.<List<Event>>newArrayList());
- }
-
- @Test
- public void testNotFollowedByBeforeOptionalAtTheEnd() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event a1 = new Event(40, "a", 1.0);
- Event c1 = new Event(41, "c", 2.0);
- Event b1 = new Event(42, "b", 3.0);
- Event c2 = new Event(43, "c", 4.0);
-
- inputEvents.add(new StreamRecord<>(a1, 1));
- inputEvents.add(new StreamRecord<>(c1, 2));
- inputEvents.add(new StreamRecord<>(b1, 3));
- inputEvents.add(new StreamRecord<>(c2, 4));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -4289351792573443294L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -4989574608417523507L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- }).followedByAny("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -5940131818629290579L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).optional();
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
- compareMaps(matches,Lists.<List<Event>>newArrayList(
- Lists.newArrayList(a1, c1),
- Lists.newArrayList(a1)
- ));
- }
-
- @Test
- public void testNotFollowedByBeforeOptionalTimes() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event a1 = new Event(40, "a", 1.0);
- Event c1 = new Event(41, "c", 2.0);
- Event b1 = new Event(42, "b", 3.0);
- Event c2 = new Event(43, "c", 4.0);
- Event d = new Event(43, "d", 4.0);
-
- inputEvents.add(new StreamRecord<>(a1, 1));
- inputEvents.add(new StreamRecord<>(c1, 2));
- inputEvents.add(new StreamRecord<>(b1, 3));
- inputEvents.add(new StreamRecord<>(c2, 4));
- inputEvents.add(new StreamRecord<>(d, 5));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -7885381452276160322L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 3471511260235826653L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- }).followedByAny("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 9073793782452363833L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 7972902718259767076L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("d");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
- compareMaps(matches,Lists.<List<Event>>newArrayList(
- Lists.newArrayList(a1, c1, c2, d)
- ));
- }
-
- @Test
- public void testNotFollowedByWithBranchingAtStart() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event a1 = new Event(40, "a", 1.0);
- Event b1 = new Event(42, "b", 3.0);
- Event c1 = new Event(41, "c", 2.0);
- Event a2 = new Event(41, "a", 4.0);
- Event c2 = new Event(43, "c", 5.0);
- Event d = new Event(43, "d", 6.0);
-
- inputEvents.add(new StreamRecord<>(a1, 1));
- inputEvents.add(new StreamRecord<>(b1, 2));
- inputEvents.add(new StreamRecord<>(c1, 3));
- inputEvents.add(new StreamRecord<>(a2, 4));
- inputEvents.add(new StreamRecord<>(c2, 5));
- inputEvents.add(new StreamRecord<>(d, 6));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -7866220136345465444L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 4957837489028234932L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- }).followedBy("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5569569968862808007L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = -8579678167937416269L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("d");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
- compareMaps(matches, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(a2, c2, d)
- ));
- }
-
- private static class NotFollowByData {
- static final Event a1 = new Event(40, "a", 1.0);
- static final Event b1 = new Event(41, "b", 2.0);
- static final Event b2 = new Event(42, "b", 3.0);
- static final Event b3 = new Event(42, "b", 4.0);
- static final Event c1 = new Event(43, "c", 5.0);
- static final Event b4 = new Event(42, "b", 6.0);
- static final Event b5 = new Event(42, "b", 7.0);
- static final Event b6 = new Event(42, "b", 8.0);
- static final Event d1 = new Event(43, "d", 9.0);
-
- private NotFollowByData() {
- }
- }
-
- @Test
- public void testNotNextAfterOneOrMoreSkipTillNext() {
- final List<List<Event>> matches = testNotNextAfterOneOrMore(false);
- assertEquals(0, matches.size());
- }
-
- @Test
- public void testNotNextAfterOneOrMoreSkipTillAny() {
- final List<List<Event>> matches = testNotNextAfterOneOrMore(true);
- compareMaps(matches, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b2, NotFollowByData.d1)
- ));
- }
-
- private List<List<Event>> testNotNextAfterOneOrMore(boolean allMatches) {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- int i = 0;
- inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b2, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i++));
-
- Pattern<Event, ?> pattern = Pattern
- .<Event>begin("a").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- });
-
- pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*")).where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- }).oneOrMore()
- .notNext("not c").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- })
- .followedBy("d").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("d");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- return feedNFA(inputEvents, nfa);
- }
-
- @Test
- public void testNotFollowedByNextAfterOneOrMoreEager() {
- final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(true, false);
- assertEquals(0, matches.size());
- }
-
- @Test
- public void testNotFollowedByAnyAfterOneOrMoreEager() {
- final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(true, true);
- compareMaps(matches, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b6, NotFollowByData.d1)
- ));
- }
-
- @Test
- public void testNotFollowedByNextAfterOneOrMoreCombinations() {
- final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(false, false);
- assertEquals(0, matches.size());
- }
-
- @Test
- public void testNotFollowedByAnyAfterOneOrMoreCombinations() {
- final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(false, true);
- compareMaps(matches, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b6, NotFollowByData.d1)
- ));
- }
-
- private List<List<Event>> testNotFollowedByAfterOneOrMore(boolean eager, boolean allMatches) {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- int i = 0;
- inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b2, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b3, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b4, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b5, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b6, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i));
-
- Pattern<Event, ?> pattern = Pattern
- .<Event>begin("a").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- });
-
- pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
- .where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- });
-
- pattern = (eager ? pattern.oneOrMore() : pattern.oneOrMore().allowCombinations())
- .notFollowedBy("not c").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- })
- .followedBy("d").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("d");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- return feedNFA(inputEvents, nfa);
- }
-
- @Test
- public void testNotFollowedByAnyBeforeOneOrMoreEager() {
- final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(true, true);
-
- compareMaps(matches, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
- ));
- }
-
- @Test
- public void testNotFollowedByAnyBeforeOneOrMoreCombinations() {
- final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(false, true);
-
- compareMaps(matches, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
- ));
- }
-
- @Test
- public void testNotFollowedByBeforeOneOrMoreEager() {
- final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(true, false);
-
- compareMaps(matches, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
- ));
- }
-
- @Test
- public void testNotFollowedByBeforeOneOrMoreCombinations() {
- final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(false, false);
-
- compareMaps(matches, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
- ));
- }
-
- private List<List<Event>> testNotFollowedByBeforeOneOrMore(boolean eager, boolean allMatches) {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- int i = 0;
- inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b4, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b5, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b6, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i));
-
- Pattern<Event, ?> pattern = Pattern
- .<Event>begin("a").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- })
- .notFollowedBy("not c").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- });
-
- pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
- .where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- }).oneOrMore();
-
- pattern = (eager ? pattern : pattern.allowCombinations())
- .followedBy("d").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("d");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- return feedNFA(inputEvents, nfa);
- }
-
- @Test
- public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillNext() {
- final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(true, false);
- compareMaps(matches, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
- ));
- }
-
- @Test
- public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillNext() {
- final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(false, false);
- compareMaps(matches, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1)
- ));
- }
-
- @Test
- public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillAny() {
- final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(true, true);
- compareMaps(matches, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
- ));
- }
-
- @Test
- public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillAny() {
- final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(false, true);
- compareMaps(matches, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
- Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1)
- ));
- }
-
- private List<List<Event>> testNotFollowedByBeforeZeroOrMore(boolean eager, boolean allMatches) {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- int i = 0;
- inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b4, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b5, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.b6, i++));
- inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i));
-
- Pattern<Event, ?> pattern = Pattern
- .<Event>begin("a").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- })
- .notFollowedBy("not c").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- });
-
- pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
- .where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- }).oneOrMore().optional();
-
- pattern = (eager ? pattern : pattern.allowCombinations())
- .followedBy("d").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("d");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- return feedNFA(inputEvents, nfa);
- }
-
- @Test
- public void testEagerZeroOrMoreSameElement() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event startEvent = new Event(40, "c", 1.0);
- Event middleEvent1 = new Event(41, "a", 2.0);
- Event middleEvent2 = new Event(42, "a", 3.0);
- Event middleEvent3 = new Event(43, "a", 4.0);
- Event end1 = new Event(44, "b", 5.0);
-
- inputEvents.add(new StreamRecord<>(startEvent, 1));
- inputEvents.add(new StreamRecord<>(middleEvent1, 3));
- inputEvents.add(new StreamRecord<>(middleEvent1, 3));
- inputEvents.add(new StreamRecord<>(middleEvent1, 3));
- inputEvents.add(new StreamRecord<>(middleEvent2, 4));
- inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
- inputEvents.add(new StreamRecord<>(middleEvent3, 6));
- inputEvents.add(new StreamRecord<>(middleEvent3, 6));
- inputEvents.add(new StreamRecord<>(end1, 7));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).followedBy("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).oneOrMore().optional().followedBy("end1").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
- compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
- Lists.newArrayList(startEvent, middleEvent1, end1),
- Lists.newArrayList(startEvent, end1)
- ));
- }
-
- @Test
- public void testMultipleTakesVersionCollision() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event startEvent = new Event(40, "c", 1.0);
- Event middleEvent1 = new Event(41, "a", 2.0);
- Event middleEvent2 = new Event(41, "a", 3.0);
- Event middleEvent3 = new Event(41, "a", 4.0);
- Event middleEvent4 = new Event(41, "a", 5.0);
- Event middleEvent5 = new Event(41, "a", 6.0);
- Event end = new Event(44, "b", 5.0);
-
- inputEvents.add(new StreamRecord<>(startEvent, 1));
- inputEvents.add(new StreamRecord<>(middleEvent1, 3));
- inputEvents.add(new StreamRecord<>(middleEvent2, 4));
- inputEvents.add(new StreamRecord<>(middleEvent3, 5));
- inputEvents.add(new StreamRecord<>(middleEvent4, 6));
- inputEvents.add(new StreamRecord<>(middleEvent5, 7));
- inputEvents.add(new StreamRecord<>(end, 10));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).followedBy("middle1").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).oneOrMore().allowCombinations().followedBy("middle2").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).oneOrMore().allowCombinations().followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
- compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
-
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent4, middleEvent5, end),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent5, end),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent4, middleEvent5, end),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent4, middleEvent5, end),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent5, end),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent4, middleEvent5, end),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent4, middleEvent5, end),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent5, end),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent4, middleEvent5, end),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent4, middleEvent5, end),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent5, end),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent4, middleEvent5, end),
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end),
Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent4, end),
@@ -4054,386 +2622,4 @@ public class NFAITCase extends TestLogger {
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end)
));
}
-
- @Test
- public void testZeroOrMoreSameElement() {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event startEvent = new Event(40, "c", 1.0);
- Event middleEvent1 = new Event(41, "a", 2.0);
- Event middleEvent1a = new Event(41, "a", 2.0);
- Event middleEvent2 = new Event(42, "a", 3.0);
- Event middleEvent3 = new Event(43, "a", 4.0);
- Event middleEvent3a = new Event(43, "a", 4.0);
- Event end1 = new Event(44, "b", 5.0);
-
- inputEvents.add(new StreamRecord<>(startEvent, 1));
- inputEvents.add(new StreamRecord<>(middleEvent1, 3));
- inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
- inputEvents.add(new StreamRecord<>(middleEvent2, 4));
- inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
- inputEvents.add(new StreamRecord<>(middleEvent3, 6));
- inputEvents.add(new StreamRecord<>(middleEvent3a, 6));
- inputEvents.add(new StreamRecord<>(end1, 7));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).followedByAny("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).oneOrMore().optional().allowCombinations().followedByAny("end1").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
- compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3, middleEvent3a, end1),
-
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3a, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3, middleEvent3a, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent3a, end1),
- Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3, middleEvent3a, end1),
-
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3a, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3a, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent3a, end1),
- Lists.newArrayList(startEvent, middleEvent2, middleEvent3, middleEvent3a, end1),
- Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3, end1),
- Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3a, end1),
- Lists.newArrayList(startEvent, middleEvent1a, middleEvent3, middleEvent3a, end1),
-
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent3a, end1),
- Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, end1),
- Lists.newArrayList(startEvent, middleEvent1a, middleEvent3, end1),
- Lists.newArrayList(startEvent, middleEvent1a, middleEvent3a, end1),
- Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1),
- Lists.newArrayList(startEvent, middleEvent2, middleEvent3a, end1),
- Lists.newArrayList(startEvent, middleEvent3, middleEvent3a, end1),
-
- Lists.newArrayList(startEvent, middleEvent1, end1),
- Lists.newArrayList(startEvent, middleEvent1a, end1),
- Lists.newArrayList(startEvent, middleEvent2, end1),
- Lists.newArrayList(startEvent, middleEvent3, end1),
- Lists.newArrayList(startEvent, middleEvent3a, end1),
-
- Lists.newArrayList(startEvent, end1)
- ));
- }
-
- @Test
- public void testSimplePatternWSameElement() throws Exception {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event startEvent = new Event(40, "c", 1.0);
- Event middleEvent1 = new Event(41, "a", 2.0);
- Event end1 = new Event(44, "b", 5.0);
-
- inputEvents.add(new StreamRecord<>(startEvent, 1));
- inputEvents.add(new StreamRecord<>(middleEvent1, 3));
- inputEvents.add(new StreamRecord<>(middleEvent1, 3));
- inputEvents.add(new StreamRecord<>(end1, 7));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).followedByAny("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).followedBy("end1").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
- compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent, middleEvent1, end1),
- Lists.newArrayList(startEvent, middleEvent1, end1)
- ));
- }
-
- @Test
- public void testIterativeConditionWSameElement() throws Exception {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event startEvent = new Event(40, "c", 1.0);
- Event middleEvent1 = new Event(41, "a", 2.0);
- Event middleEvent1a = new Event(41, "a", 2.0);
- Event middleEvent1b = new Event(41, "a", 2.0);
- final Event end = new Event(44, "b", 5.0);
-
- inputEvents.add(new StreamRecord<>(startEvent, 1));
- inputEvents.add(new StreamRecord<>(middleEvent1, 3));
- inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
- inputEvents.add(new StreamRecord<>(middleEvent1b, 3));
- inputEvents.add(new StreamRecord<>(end, 7));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).followedByAny("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).oneOrMore().optional().allowCombinations().followedBy("end").where(new IterativeCondition<Event>() {
-
- private static final long serialVersionUID = -5566639743229703237L;
-
- @Override
- public boolean filter(Event value, Context<Event> ctx) throws Exception {
- double sum = 0.0;
- for (Event event: ctx.getEventsForPattern("middle")) {
- sum += event.getPrice();
- }
- return Double.compare(sum, 4.0) == 0;
- }
-
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
- compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, end),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent1b),
- Lists.newArrayList(startEvent, middleEvent1a, middleEvent1b, end)
- ));
- }
-
- @Test
- public void testEndWLoopingWSameElement() throws Exception {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event startEvent = new Event(40, "c", 1.0);
- Event middleEvent1 = new Event(41, "a", 2.0);
- Event middleEvent1a = new Event(41, "a", 2.0);
- Event middleEvent1b = new Event(41, "a", 2.0);
- final Event end = new Event(44, "b", 5.0);
-
- inputEvents.add(new StreamRecord<>(startEvent, 1));
- inputEvents.add(new StreamRecord<>(middleEvent1, 3));
- inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
- inputEvents.add(new StreamRecord<>(middleEvent1b, 3));
- inputEvents.add(new StreamRecord<>(end, 7));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).followedByAny("middle").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).oneOrMore().optional();
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
- compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent),
- Lists.newArrayList(startEvent, middleEvent1),
- Lists.newArrayList(startEvent, middleEvent1a),
- Lists.newArrayList(startEvent, middleEvent1b),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1a),
- Lists.newArrayList(startEvent, middleEvent1a, middleEvent1b),
- Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent1b)
- ));
- }
-
- @Test
- public void testRepeatingPatternWSameElement() throws Exception {
- List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
- Event startEvent = new Event(40, "c", 1.0);
- Event middle1Event1 = new Event(40, "a", 2.0);
- Event middle1Event2 = new Event(40, "a", 3.0);
- Event middle1Event3 = new Event(40, "a", 4.0);
- Event middle2Event1 = new Event(40, "b", 5.0);
-
- inputEvents.add(new StreamRecord<>(startEvent, 1));
- inputEvents.add(new StreamRecord<>(middle1Event1, 3));
- inputEvents.add(new StreamRecord<>(middle1Event1, 3));
- inputEvents.add(new StreamRecord<>(middle1Event2, 3));
- inputEvents.add(new StreamRecord<>(new Event(40, "d", 6.0), 5));
- inputEvents.add(new StreamRecord<>(middle2Event1, 6));
- inputEvents.add(new StreamRecord<>(middle1Event3, 7));
-
- Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("c");
- }
- }).followedBy("middle1").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- }).oneOrMore().optional().followedBy("middle2").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("b");
- }
- }).optional().followedBy("end").where(new SimpleCondition<Event>() {
- private static final long serialVersionUID = 5726188262756267490L;
-
- @Override
- public boolean filter(Event value) throws Exception {
- return value.getName().equals("a");
- }
- });
-
- NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
- compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
- Lists.newArrayList(startEvent, middle1Event1),
-
- Lists.newArrayList(startEvent, middle1Event1, middle1Event1),
- Lists.newArrayList(startEvent, middle2Event1, middle1Event3),
-
- Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2),
- Lists.newArrayList(startEvent, middle1Event1, middle2Event1, middle1Event3),
-
- Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle1Event3),
- Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle2Event1, middle1Event3),
-
- Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle2Event1, middle1Event3)
- ));
- }
-
- ///////////////////////////////////////// Utility /////////////////////////////////////////////////
-
- private List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa) {
- List<List<Event>> resultingPatterns = new ArrayList<>();
-
- for (StreamRecord<Event> inputEvent : inputEvents) {
- Collection<Map<String, List<Event>>> patterns = nfa.process(
- inputEvent.getValue(),
- inputEvent.getTimestamp()).f0;
-
- for (Map<String, List<Event>> p: patterns) {
- List<Event> res = new ArrayList<>();
- for (List<Event> le: p.values()) {
- res.addAll(le);
- }
- resultingPatterns.add(res);
- }
- }
- return resultingPatterns;
- }
-
- private void compareMaps(List<List<Event>> actual, List<List<Event>> expected) {
- Assert.assertEquals(expected.size(), actual.size());
-
- for (List<Event> p: actual) {
- Collections.sort(p, new EventComparator());
- }
-
- for (List<Event> p: expected) {
- Collections.sort(p, new EventComparator());
- }
-
- Collections.sort(actual, new ListEventComparator());
- Collections.sort(expected, new ListEventComparator());
- Assert.assertArrayEquals(expected.toArray(), actual.toArray());
- }
-
- private class ListEventComparator implements Comparator<List<Event>> {
-
- @Override
- public int compare(List<Event> o1, List<Event> o2) {
- int sizeComp = Integer.compare(o1.size(), o2.size());
- if (sizeComp == 0) {
- EventComparator comp = new EventComparator();
- for (int i = 0; i < o1.size(); i++) {
- int eventComp = comp.compare(o1.get(i), o2.get(i));
- if (eventComp != 0) {
- return eventComp;
- }
- }
- return 0;
- } else {
- return sizeComp;
- }
- }
- }
-
- private class EventComparator implements Comparator<Event> {
-
- @Override
- public int compare(Event o1, Event o2) {
- int nameComp = o1.getName().compareTo(o2.getName());
- int priceComp = Doubles.compare(o1.getPrice(), o2.getPrice());
- int idComp = Integer.compare(o1.getId(), o2.getId());
- if (nameComp == 0) {
- if (priceComp == 0) {
- return idComp;
- } else {
- return priceComp;
- }
- } else {
- return nameComp;
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 2619764..2586342 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -18,7 +18,6 @@
package org.apache.flink.cep.nfa;
-import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
@@ -29,6 +28,8 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
import org.junit.Test;
import java.io.ByteArrayInputStream;
@@ -44,6 +45,9 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
+/**
+ * Tests for {@link NFA}.
+ */
public class NFATest extends TestLogger {
@Test
public void testSimpleNFA() {
@@ -147,7 +151,7 @@ public class NFATest extends TestLogger {
/**
* Tests that pruning shared buffer elements and computations state use the same window border
- * semantics (left side inclusive and right side exclusive)
+ * semantics (left side inclusive and right side exclusive).
*/
@Test
public void testTimeoutWindowPruningWindowBorders() {