You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/01/09 08:13:10 UTC
[flink] 02/05: [hotfix][cep] Introduced nfa test harness
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit abe7ae23d5dc969b9034b206b8d492903c790b30
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Dec 12 09:16:07 2018 +0100
[hotfix][cep] Introduced nfa test harness
---
.../apache/flink/cep/nfa/AfterMatchSkipITCase.java | 110 ++++++------
.../org/apache/flink/cep/nfa/GreedyITCase.java | 4 +-
.../java/org/apache/flink/cep/nfa/GroupITCase.java | 8 +-
.../flink/cep/nfa/IterativeConditionsITCase.java | 4 +-
.../java/org/apache/flink/cep/nfa/NFAITCase.java | 91 ++++------
.../apache/flink/cep/nfa/NFAStateAccessTest.java | 41 ++---
.../flink/cep/nfa/NFAStatusChangeITCase.java | 20 ++-
.../java/org/apache/flink/cep/nfa/NFATest.java | 148 +++++++---------
.../org/apache/flink/cep/nfa/NotPatternITCase.java | 4 +-
.../apache/flink/cep/nfa/SameElementITCase.java | 11 +-
.../apache/flink/cep/nfa/TimesOrMoreITCase.java | 4 +-
.../org/apache/flink/cep/nfa/TimesRangeITCase.java | 4 +-
.../apache/flink/cep/nfa/UntilConditionITCase.java | 36 ++--
.../org/apache/flink/cep/utils/NFATestHarness.java | 191 +++++++++++++++++++++
.../flink/cep/{nfa => utils}/NFATestUtilities.java | 60 +------
15 files changed, 417 insertions(+), 319 deletions(-)
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
index f6bf0a8..89f3bf8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
@@ -22,10 +22,10 @@ import org.apache.flink.cep.Event;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.aftermatch.SkipPastLastStrategy;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.cep.utils.TestSharedBuffer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.FlinkRuntimeException;
@@ -40,9 +40,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
-import static org.apache.flink.cep.utils.NFAUtils.compile;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
import static org.junit.Assert.assertThat;
/**
@@ -77,9 +75,9 @@ public class AfterMatchSkipITCase extends TestLogger{
}
}).times(3);
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3),
@@ -141,9 +139,11 @@ public class AfterMatchSkipITCase extends TestLogger{
}
});
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
+ .withAfterMatchSkipStrategy(skipStrategy)
+ .build();
- return feedNFA(streamEvents, nfa, skipStrategy);
+ return nfaTestHarness.feedRecords(streamEvents);
}
}
@@ -198,9 +198,11 @@ public class AfterMatchSkipITCase extends TestLogger{
}
}).oneOrMore();
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
+ .withAfterMatchSkipStrategy(skipStrategy)
+ .build();
- return feedNFA(streamEvents, nfa, skipStrategy);
+ return nfaTestHarness.feedRecords(streamEvents);
}
}
@@ -231,9 +233,9 @@ public class AfterMatchSkipITCase extends TestLogger{
}
}).times(3);
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3),
@@ -275,9 +277,9 @@ public class AfterMatchSkipITCase extends TestLogger{
}
}).times(2);
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(ab1, ab2, ab3, ab4),
@@ -318,9 +320,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("b");
}
}).times(2);
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(ab1, ab2, ab3, ab4),
@@ -376,9 +378,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("d");
}
});
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Collections.singletonList(
Lists.newArrayList(a1, b1, c1, d1)
@@ -415,9 +417,9 @@ public class AfterMatchSkipITCase extends TestLogger{
}
}
);
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(a2, b2)
@@ -459,9 +461,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("c");
}
});
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(ab1, c1),
@@ -498,9 +500,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("c");
}
});
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(ab1, c1),
@@ -543,9 +545,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("b");
}
}).oneOrMore().consecutive();
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, b1),
@@ -573,9 +575,9 @@ public class AfterMatchSkipITCase extends TestLogger{
}
}
);
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
//skip to first element of a match should throw exception if they are enabled,
//this mode is used in MATCH RECOGNIZE which assumes that skipping to first element
@@ -645,9 +647,11 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("c");
}
});
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
+ .withAfterMatchSkipStrategy(skipStrategy)
+ .build();
- return feedNFA(streamEvents, nfa, skipStrategy);
+ return nfaTestHarness.feedRecords(streamEvents);
}
}
@@ -686,9 +690,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("b");
}
}).oneOrMore().consecutive();
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, b1),
@@ -728,9 +732,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("b");
}
});
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Collections.singletonList(
Lists.newArrayList(a1, a2, a3, b1)
@@ -768,9 +772,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("b");
}
});
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3, b1),
@@ -809,9 +813,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("b");
}
});
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3, b1),
@@ -851,9 +855,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("b");
}
});
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, a2, a3, b1),
@@ -913,9 +917,9 @@ public class AfterMatchSkipITCase extends TestLogger{
return value.getName().contains("d");
}
});
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a, b, c1, c2, c3, d),
@@ -962,9 +966,9 @@ public class AfterMatchSkipITCase extends TestLogger{
ctx.getEventsForPattern("a").iterator().next().getPrice() == value.getPrice();
}
});
- NFA<Event> nfa = compile(pattern, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
- List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
compareMaps(resultingPatterns, Lists.newArrayList(
Lists.newArrayList(a1, c1, b2),
@@ -991,22 +995,10 @@ public class AfterMatchSkipITCase extends TestLogger{
}
}).times(2);
- NFA<Event> nfa = compile(pattern, false);
-
SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
- NFAState nfaState = nfa.createInitialNFAState();
-
- for (StreamRecord<Event> inputEvent : inputEvents) {
- try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
- nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
- nfa.process(
- sharedBufferAccessor,
- nfaState,
- inputEvent.getValue(),
- inputEvent.getTimestamp(),
- matchSkipStrategy);
- }
- }
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).withSharedBuffer(sharedBuffer).build();
+
+ nfaTestHarness.feedRecords(inputEvents);
assertThat(sharedBuffer.isEmpty(), Matchers.is(true));
}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
index 9e00130..e33e59a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
@@ -31,8 +31,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
/**
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
index 93116ff..9387966 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.cep.Event;
import org.apache.flink.cep.pattern.GroupPattern;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
@@ -32,8 +33,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
@@ -1077,7 +1078,8 @@ public class GroupITCase extends TestLogger {
NFAState nfaState = nfa.createInitialNFAState();
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
+ final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(c, a1, b1, d),
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
index 9012379..ce28ddd 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
@@ -33,8 +33,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
/**
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 2391d54..62d76db 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -21,11 +21,13 @@ package org.apache.flink.cep.nfa;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.Quantifier;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.cep.utils.TestSharedBuffer;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -48,8 +50,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyLong;
@@ -414,7 +416,11 @@ public class NFAITCase extends TestLogger {
Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutPatterns =
nfa.advanceTime(sharedBufferAccessor, nfaState, event.getTimestamp());
Collection<Map<String, List<Event>>> matchedPatterns =
- nfa.process(sharedBufferAccessor, nfaState, event.getValue(), event.getTimestamp());
+ nfa.process(sharedBufferAccessor,
+ nfaState,
+ event.getValue(),
+ event.getTimestamp(),
+ AfterMatchSkipStrategy.noSkip());
resultingPatterns.addAll(matchedPatterns);
resultingTimeoutPatterns.addAll(timeoutPatterns);
@@ -2342,11 +2348,13 @@ public class NFAITCase extends TestLogger {
NFAState nfaState = nfa.createInitialNFAState();
- nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
- nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 2);
- nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 3);
- nfa.process(sharedBufferAccessor, nfaState, middleEvent3, 4);
- nfa.process(sharedBufferAccessor, nfaState, end1, 6);
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
+
+ nfaTestHarness.feedRecord(new StreamRecord<>(startEvent, 1));
+ nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent1, 2));
+ nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent2, 3));
+ nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent3, 4));
+ nfaTestHarness.feedRecord(new StreamRecord<>(end1, 6));
//pruning element
nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
@@ -2387,10 +2395,11 @@ public class NFAITCase extends TestLogger {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
- nfa.process(sharedBufferAccessor, nfaState, middleEvent, 5);
- nfa.process(sharedBufferAccessor, nfaState, end1, 6);
+ nfaTestHarness.feedRecord(new StreamRecord<>(startEvent, 1));
+ nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent, 5));
+ nfaTestHarness.feedRecord(new StreamRecord<>(end1, 6));
//pruning element
nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
@@ -2432,11 +2441,12 @@ public class NFAITCase extends TestLogger {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
- nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 3);
- nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 4);
- nfa.process(sharedBufferAccessor, nfaState, end1, 6);
+ nfaTestHarness.consumeRecord(new StreamRecord<>(startEvent, 1));
+ nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent1, 3));
+ nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent2, 4));
+ nfaTestHarness.consumeRecord(new StreamRecord<>(end1, 6));
//pruning element
nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
@@ -2478,11 +2488,12 @@ public class NFAITCase extends TestLogger {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
- nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 3);
- nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 4);
- nfa.process(sharedBufferAccessor, nfaState, end1, 6);
+ nfaTestHarness.consumeRecord(new StreamRecord<>(startEvent, 1));
+ nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent1, 3));
+ nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent2, 4));
+ nfaTestHarness.consumeRecord(new StreamRecord<>(end1, 6));
//pruning element
nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
@@ -2734,25 +2745,12 @@ public class NFAITCase extends TestLogger {
}
}).times(3).consecutive();
- NFA<Event> nfa = compile(pattern, false);
-
- List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
-
- NFAState nfaState = nfa.createInitialNFAState();
-
- for (StreamRecord<Event> inputEvent : inputEvents) {
- Collection<Map<String, List<Event>>> patterns = nfa.process(
- sharedBufferAccessor,
- nfaState,
- inputEvent.getValue(),
- inputEvent.getTimestamp());
-
- resultingPatterns.addAll(patterns);
- }
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
+ Collection<Map<String, List<Event>>> resultingPatterns = nfaTestHarness.consumeRecords(inputEvents);
Assert.assertEquals(1L, resultingPatterns.size());
- Map<String, List<Event>> match = resultingPatterns.get(0);
+ Map<String, List<Event>> match = resultingPatterns.iterator().next();
Assert.assertArrayEquals(
match.get("start").toArray(),
Lists.newArrayList(startEvent1, startEvent2, startEvent3, startEvent4).toArray());
@@ -2809,25 +2807,12 @@ public class NFAITCase extends TestLogger {
}
});
- NFA<Event> nfa = compile(pattern, false);
-
- List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
-
- NFAState nfaState = nfa.createInitialNFAState();
-
- for (StreamRecord<Event> inputEvent : inputEvents) {
- Collection<Map<String, List<Event>>> patterns = nfa.process(
- sharedBufferAccessor,
- nfaState,
- inputEvent.getValue(),
- inputEvent.getTimestamp());
-
- resultingPatterns.addAll(patterns);
- }
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
+ Collection<Map<String, List<Event>>> resultingPatterns = nfaTestHarness.consumeRecords(inputEvents);
Assert.assertEquals(1L, resultingPatterns.size());
- Map<String, List<Event>> match = resultingPatterns.get(0);
+ Map<String, List<Event>> match = resultingPatterns.iterator().next();
List<String> expectedOrder = Lists.newArrayList("a", "b", "aa", "bb", "ab");
List<String> resultOrder = new ArrayList<>();
@@ -2847,8 +2832,8 @@ public class NFAITCase extends TestLogger {
try (SharedBufferAccessor<Event> accessor = Mockito.spy(sharedBuffer.getAccessor())) {
NFA<Event> nfa = compile(pattern, false);
- nfa.process(accessor, nfa.createInitialNFAState(), a, 1);
- nfa.process(accessor, nfa.createInitialNFAState(), b, 2);
+ nfa.process(accessor, nfa.createInitialNFAState(), a, 1, AfterMatchSkipStrategy.noSkip());
+ nfa.process(accessor, nfa.createInitialNFAState(), b, 2, AfterMatchSkipStrategy.noSkip());
Mockito.verify(accessor, Mockito.never()).advanceTime(anyLong());
nfa.advanceTime(accessor, nfa.createInitialNFAState(), 2);
Mockito.verify(accessor, Mockito.times(1)).advanceTime(2);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java
index 227fe4e..1be5f4f 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.cep.nfa;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.cep.utils.TestSharedBuffer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -32,7 +32,6 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
/**
@@ -99,22 +98,13 @@ public class NFAStateAccessTest {
}
});
- NFA<Event> nfa = compile(pattern, false);
-
TestSharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
- for (StreamRecord<Event> inputEvent : inputEvents) {
- try (SharedBufferAccessor<Event> accessor = sharedBuffer.getAccessor()) {
- nfa.process(
- accessor,
- nfa.createInitialNFAState(),
- inputEvent.getValue(),
- inputEvent.getTimestamp());
- }
- }
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).withSharedBuffer(sharedBuffer).build();
+ nfaTestHarness.consumeRecords(inputEvents);
- assertEquals(2, sharedBuffer.getStateReads());
- assertEquals(3, sharedBuffer.getStateWrites());
- assertEquals(5, sharedBuffer.getStateAccesses());
+ assertEquals(58, sharedBuffer.getStateReads());
+ assertEquals(33, sharedBuffer.getStateWrites());
+ assertEquals(91, sharedBuffer.getStateAccesses());
}
@Test
@@ -182,21 +172,12 @@ public class NFAStateAccessTest {
}
});
- NFA<Event> nfa = compile(pattern, false);
-
TestSharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
- for (StreamRecord<Event> inputEvent : inputEvents) {
- try (SharedBufferAccessor<Event> accessor = sharedBuffer.getAccessor()) {
- nfa.process(
- accessor,
- nfa.createInitialNFAState(),
- inputEvent.getValue(),
- inputEvent.getTimestamp());
- }
- }
+ NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).withSharedBuffer(sharedBuffer).build();
+ nfaTestHarness.consumeRecords(inputEvents);
- assertEquals(8, sharedBuffer.getStateReads());
- assertEquals(12, sharedBuffer.getStateWrites());
- assertEquals(20, sharedBuffer.getStateAccesses());
+ assertEquals(90, sharedBuffer.getStateReads());
+ assertEquals(31, sharedBuffer.getStateWrites());
+ assertEquals(121, sharedBuffer.getStateAccesses());
}
}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
index 8cbfba1..d0a7853 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
@@ -47,6 +48,7 @@ public class NFAStatusChangeITCase {
private SharedBuffer<Event> sharedBuffer;
private SharedBufferAccessor<Event> sharedBufferAccessor;
+ private AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
@Before
public void init() {
@@ -95,32 +97,32 @@ public class NFAStatusChangeITCase {
NFAState nfaState = nfa.createInitialNFAState();
- nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L, skipStrategy);
assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfaState.isStateChanged());
nfaState.resetStateChanged();
- nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L, skipStrategy);
assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfaState.isStateChanged());
// the status of the queue of ComputationStatus changed,
// more than one ComputationStatus is generated by the event from some ComputationStatus
nfaState.resetStateChanged();
- nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L, skipStrategy);
assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have not changed
nfaState.resetStateChanged();
- nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L, skipStrategy);
assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have changed
nfaState.resetStateChanged();
- nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L, skipStrategy);
assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have changed
nfaState.resetStateChanged();
- nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L, skipStrategy);
assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfaState.isStateChanged());
// both the queue of ComputationStatus and eventSharedBuffer have not changed
@@ -164,10 +166,10 @@ public class NFAStatusChangeITCase {
NFAState nfaState = nfa.createInitialNFAState();
nfaState.resetStateChanged();
- nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy);
nfaState.resetStateChanged();
- nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L, skipStrategy);
assertTrue(nfaState.isStateChanged());
}
@@ -193,7 +195,7 @@ public class NFAStatusChangeITCase {
nfaState.resetStateChanged();
nfa.advanceTime(sharedBufferAccessor, nfaState, 6L);
- nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L);
+ nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy);
nfaState.resetStateChanged();
nfa.advanceTime(sharedBufferAccessor, nfaState, 17L);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index e5f4c9e..638b8b5 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -19,13 +19,11 @@
package org.apache.flink.cep.nfa;
import org.apache.flink.cep.Event;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.BooleanConditions;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
-import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -39,10 +37,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
@@ -91,9 +87,7 @@ public class NFATest extends TestLogger {
states.add(endState);
states.add(endingState);
- NFA<Event> nfa = new NFA<>(states, 0, false);
-
- Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
+ List<Map<String, List<Event>>> expectedPatterns = new ArrayList<>();
Map<String, List<Event>> firstPattern = new HashMap<>();
firstPattern.put("start", Collections.singletonList(new Event(1, "start", 1.0)));
@@ -106,14 +100,16 @@ public class NFATest extends TestLogger {
expectedPatterns.add(firstPattern);
expectedPatterns.add(secondPattern);
- Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
+ NFA<Event> nfa = new NFA<>(states, 0, false);
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
+
+ Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
assertEquals(expectedPatterns, actualPatterns);
}
@Test
public void testTimeoutWindowPruning() throws Exception {
- NFA<Event> nfa = createStartEndNFA();
List<StreamRecord<Event>> streamEvents = new ArrayList<>();
streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
@@ -121,7 +117,7 @@ public class NFATest extends TestLogger {
streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 3L));
streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 4L));
- Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
+ List<Map<String, List<Event>>> expectedPatterns = new ArrayList<>();
Map<String, List<Event>> secondPattern = new HashMap<>();
secondPattern.put("start", Collections.singletonList(new Event(3, "start", 3.0)));
@@ -129,7 +125,10 @@ public class NFATest extends TestLogger {
expectedPatterns.add(secondPattern);
- Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
+ NFA<Event> nfa = createStartEndNFA();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
+
+ Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
assertEquals(expectedPatterns, actualPatterns);
}
@@ -140,15 +139,17 @@ public class NFATest extends TestLogger {
*/
@Test
public void testWindowBorders() throws Exception {
- NFA<Event> nfa = createStartEndNFA();
List<StreamRecord<Event>> streamEvents = new ArrayList<>();
streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
streamEvents.add(new StreamRecord<>(new Event(2, "end", 2.0), 3L));
- Set<Map<String, List<Event>>> expectedPatterns = Collections.emptySet();
+ List<Map<String, List<Event>>> expectedPatterns = Collections.emptyList();
+
+ NFA<Event> nfa = createStartEndNFA();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
- Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
+ Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
assertEquals(expectedPatterns, actualPatterns);
}
@@ -159,7 +160,6 @@ public class NFATest extends TestLogger {
*/
@Test
public void testTimeoutWindowPruningWindowBorders() throws Exception {
- NFA<Event> nfa = createStartEndNFA();
List<StreamRecord<Event>> streamEvents = new ArrayList<>();
streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
@@ -167,7 +167,7 @@ public class NFATest extends TestLogger {
streamEvents.add(new StreamRecord<>(new Event(3, "foobar", 3.0), 3L));
streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 3L));
- Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
+ List<Map<String, List<Event>>> expectedPatterns = new ArrayList<>();
Map<String, List<Event>> secondPattern = new HashMap<>();
secondPattern.put("start", Collections.singletonList(new Event(2, "start", 2.0)));
@@ -175,30 +175,12 @@ public class NFATest extends TestLogger {
expectedPatterns.add(secondPattern);
- Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
+ NFA<Event> nfa = createStartEndNFA();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
- assertEquals(expectedPatterns, actualPatterns);
- }
+ Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
- public Collection<Map<String, List<Event>>> runNFA(
- NFA<Event> nfa, NFAState nfaState, List<StreamRecord<Event>> inputs) throws Exception {
- Set<Map<String, List<Event>>> actualPatterns = new HashSet<>();
-
- SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
- try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
- for (StreamRecord<Event> streamEvent : inputs) {
- nfa.advanceTime(sharedBufferAccessor, nfaState, streamEvent.getTimestamp());
- Collection<Map<String, List<Event>>> matchedPatterns = nfa.process(
- sharedBufferAccessor,
- nfaState,
- streamEvent.getValue(),
- streamEvent.getTimestamp());
-
- actualPatterns.addAll(matchedPatterns);
- }
- }
-
- return actualPatterns;
+ assertEquals(expectedPatterns, actualPatterns);
}
@Test
@@ -289,51 +271,49 @@ public class NFATest extends TestLogger {
patterns.add(pattern2);
patterns.add(pattern3);
- SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
- try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
-
- for (Pattern<Event, ?> p : patterns) {
- NFA<Event> nfa = compile(p, false);
-
- Event a = new Event(40, "a", 1.0);
- Event b = new Event(41, "b", 2.0);
- Event c = new Event(42, "c", 3.0);
- Event b1 = new Event(41, "b", 3.0);
- Event b2 = new Event(41, "b", 4.0);
- Event b3 = new Event(41, "b", 5.0);
- Event d = new Event(43, "d", 4.0);
-
- NFAState nfaState = nfa.createInitialNFAState();
-
- nfa.process(sharedBufferAccessor, nfaState, a, 1);
- nfa.process(sharedBufferAccessor, nfaState, b, 2);
- nfa.process(sharedBufferAccessor, nfaState, c, 3);
- nfa.process(sharedBufferAccessor, nfaState, b1, 4);
- nfa.process(sharedBufferAccessor, nfaState, b2, 5);
- nfa.process(sharedBufferAccessor, nfaState, b3, 6);
- nfa.process(sharedBufferAccessor, nfaState, d, 7);
- nfa.process(sharedBufferAccessor, nfaState, a, 8);
-
- NFAStateSerializer serializer = NFAStateSerializer.INSTANCE;
-
- //serialize
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- serializer.serialize(nfaState, new DataOutputViewStreamWrapper(baos));
- baos.close();
-
- // copy
- ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- serializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
- in.close();
- out.close();
-
- // deserialize
- ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
- NFAState copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
- bais.close();
- assertEquals(nfaState, copy);
- }
+ for (Pattern<Event, ?> p : patterns) {
+ NFA<Event> nfa = compile(p, false);
+
+ Event a = new Event(40, "a", 1.0);
+ Event b = new Event(41, "b", 2.0);
+ Event c = new Event(42, "c", 3.0);
+ Event b1 = new Event(41, "b", 3.0);
+ Event b2 = new Event(41, "b", 4.0);
+ Event b3 = new Event(41, "b", 5.0);
+ Event d = new Event(43, "d", 4.0);
+
+ NFAState nfaState = nfa.createInitialNFAState();
+
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
+
+ nfaTestHarness.consumeRecord(new StreamRecord<>(a, 1));
+ nfaTestHarness.consumeRecord(new StreamRecord<>(b, 2));
+ nfaTestHarness.consumeRecord(new StreamRecord<>(c, 3));
+ nfaTestHarness.consumeRecord(new StreamRecord<>(b1, 4));
+ nfaTestHarness.consumeRecord(new StreamRecord<>(b2, 5));
+ nfaTestHarness.consumeRecord(new StreamRecord<>(b3, 6));
+ nfaTestHarness.consumeRecord(new StreamRecord<>(d, 7));
+ nfaTestHarness.consumeRecord(new StreamRecord<>(a, 8));
+
+ NFAStateSerializer serializer = NFAStateSerializer.INSTANCE;
+
+ //serialize
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ serializer.serialize(nfaState, new DataOutputViewStreamWrapper(baos));
+ baos.close();
+
+ // copy
+ ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ serializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
+ in.close();
+ out.close();
+
+ // deserialize
+ ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
+ NFAState copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
+ bais.close();
+ assertEquals(nfaState, copy);
}
}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
index 11a8484..6f33218 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
@@ -31,8 +31,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
index b7f1177..3d277af 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.cep.Event;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
@@ -33,8 +34,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
@@ -141,8 +142,9 @@ public void testClearingBuffer() throws Exception {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(a1, b1, c1, d)
));
@@ -186,8 +188,9 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+ List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(a1, d1, d2, d3),
Lists.newArrayList(a1, d1, d2),
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
index 049c84b..93d8c56 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
@@ -31,8 +31,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
/**
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
index 203a1c2..dd85d87 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
@@ -31,8 +31,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
/**
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
index be0f28b..39b23b4 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.cep.Event;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -31,8 +32,8 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
import static org.apache.flink.cep.utils.NFAUtils.compile;
import static org.junit.Assert.assertEquals;
@@ -92,8 +93,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+ final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
@@ -142,8 +144,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+ final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, breaking),
@@ -193,8 +196,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+ final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
@@ -244,8 +248,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+ final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, breaking)
@@ -292,8 +297,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+ final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
@@ -342,8 +348,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+ final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, breaking),
@@ -394,8 +401,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+ final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
@@ -525,8 +533,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+ final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
@@ -579,8 +588,9 @@ public class UntilConditionITCase {
NFA<Event> nfa = compile(pattern, false);
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+ final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
@@ -631,10 +641,10 @@ public class UntilConditionITCase {
});
NFA<Event> nfa = compile(pattern, false);
-
NFAState nfaState = nfa.createInitialNFAState();
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
- final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+ final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
new file mode 100644
index 0000000..dd080fb
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.utils;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.NFAState;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test harness for setting up {@link NFA}.
+ */
+public final class NFATestHarness {
+
+ private final SharedBuffer<Event> sharedBuffer;
+ private final NFA<Event> nfa;
+ private final NFAState nfaState;
+ private final AfterMatchSkipStrategy afterMatchSkipStrategy;
+
+ private NFATestHarness(
+ SharedBuffer<Event> sharedBuffer,
+ NFA<Event> nfa,
+ NFAState nfaState,
+ AfterMatchSkipStrategy afterMatchSkipStrategy) {
+ this.sharedBuffer = sharedBuffer;
+ this.nfa = nfa;
+ this.nfaState = nfaState;
+ this.afterMatchSkipStrategy = afterMatchSkipStrategy;
+ }
+
+ /**
+ * Constructs a test harness starting from a given {@link Pattern}.
+ */
+ public static NFATestHarnessBuilderPattern forPattern(Pattern<Event, ?> pattern) {
+ return new NFATestHarnessBuilderPattern(pattern);
+ }
+
+ /**
+ * Constructs a test harness starting from a given {@link NFA}.
+ */
+ public static NFATestHarnessBuilderNFA forNFA(NFA<Event> nfa) {
+ return new NFATestHarnessBuilderNFA(nfa);
+ }
+
+ public List<List<Event>> feedRecords(List<StreamRecord<Event>> inputEvents) throws Exception {
+ List<List<Event>> resultingPatterns = new ArrayList<>();
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ resultingPatterns.addAll(feedRecord(inputEvent));
+ }
+ return resultingPatterns;
+ }
+
+ public List<List<Event>> feedRecord(StreamRecord<Event> inputEvent) throws Exception {
+ List<List<Event>> resultingPatterns = new ArrayList<>();
+ Collection<Map<String, List<Event>>> matches = consumeRecord(inputEvent);
+ for (Map<String, List<Event>> p : matches) {
+ List<Event> res = new ArrayList<>();
+ for (List<Event> le : p.values()) {
+ res.addAll(le);
+ }
+ resultingPatterns.add(res);
+ }
+ return resultingPatterns;
+ }
+
+ public Collection<Map<String, List<Event>>> consumeRecords(Collection<StreamRecord<Event>> inputEvents) throws Exception {
+ List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
+ for (StreamRecord<Event> inputEvent : inputEvents) {
+ resultingPatterns.addAll(consumeRecord(inputEvent));
+ }
+
+ return resultingPatterns;
+ }
+
+ public Collection<Map<String, List<Event>>> consumeRecord(StreamRecord<Event> inputEvent) throws Exception {
+ try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
+ nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
+ return nfa.process(
+ sharedBufferAccessor,
+ nfaState,
+ inputEvent.getValue(),
+ inputEvent.getTimestamp(),
+ afterMatchSkipStrategy);
+ }
+ }
+
+ /**
+ * Builder for {@link NFATestHarness} that encapsulates {@link Pattern}.
+ */
+ public static class NFATestHarnessBuilderPattern extends NFATestHarnessBuilderBase {
+
+ private final Pattern<Event, ?> pattern;
+ private boolean timeoutHandling = false;
+
+ NFATestHarnessBuilderPattern(Pattern<Event, ?> pattern) {
+ super(pattern.getAfterMatchSkipStrategy());
+ this.pattern = pattern;
+ }
+
+ public NFATestHarnessBuilderBase withTimeoutHandling() {
+ this.timeoutHandling = true;
+ return this;
+ }
+
+ @Override
+ public NFATestHarness build() {
+ NFA<Event> nfa = NFAUtils.compile(pattern, timeoutHandling);
+ return new NFATestHarness(
+ sharedBuffer,
+ nfa,
+ nfa.createInitialNFAState(),
+ afterMatchSkipStrategy);
+ }
+ }
+
+ /**
+ * Builder for {@link NFATestHarness} that encapsulates {@link NFA}.
+ */
+ public static class NFATestHarnessBuilderNFA extends NFATestHarnessBuilderBase {
+
+ private final NFA<Event> nfa;
+ private NFAState nfaState;
+
+ NFATestHarnessBuilderNFA(NFA<Event> nfa) {
+ super(AfterMatchSkipStrategy.noSkip());
+ this.nfa = nfa;
+ this.nfaState = nfa.createInitialNFAState();
+ }
+
+ public NFATestHarnessBuilderBase withNFAState(NFAState nfaState) {
+ this.nfaState = nfaState;
+ return this;
+ }
+
+ @Override
+ public NFATestHarness build() {
+ return new NFATestHarness(sharedBuffer, nfa, nfaState, afterMatchSkipStrategy);
+ }
+ }
+
+ /**
+ * Common builder, which can be used independent if we start with {@link Pattern} or {@link NFA}.
+ * Enables to provide custom services like {@link SharedBuffer} etc.
+ */
+ public abstract static class NFATestHarnessBuilderBase {
+
+ SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+ AfterMatchSkipStrategy afterMatchSkipStrategy;
+
+ NFATestHarnessBuilderBase(AfterMatchSkipStrategy skipStrategy) {
+ this.afterMatchSkipStrategy = skipStrategy;
+ }
+
+ public NFATestHarnessBuilderBase withSharedBuffer(SharedBuffer<Event> sharedBuffer) {
+ this.sharedBuffer = sharedBuffer;
+ return this;
+ }
+
+ public NFATestHarnessBuilderBase withAfterMatchSkipStrategy(AfterMatchSkipStrategy afterMatchSkipStrategy) {
+ this.afterMatchSkipStrategy = afterMatchSkipStrategy;
+ return this;
+ }
+
+ public abstract NFATestHarness build();
+ }
+}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestUtilities.java
similarity index 57%
rename from flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
rename to flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestUtilities.java
index 91e490e..ac45798 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestUtilities.java
@@ -16,77 +16,29 @@
* limitations under the License.
*/
-package org.apache.flink.cep.nfa;
+package org.apache.flink.cep.utils;
import org.apache.flink.cep.Event;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
-import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.Assert;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
/**
* Base method for IT tests of {@link NFA}. It provides utility methods.
*/
public class NFATestUtilities {
- public static List<List<Event>> feedNFA(
- List<StreamRecord<Event>> inputEvents,
- NFA<Event> nfa) throws Exception {
- return feedNFA(inputEvents, nfa, nfa.createInitialNFAState(), AfterMatchSkipStrategy.noSkip());
- }
-
- public static List<List<Event>> feedNFA(
- List<StreamRecord<Event>> inputEvents,
- NFA<Event> nfa,
- NFAState nfaState) throws Exception {
- return feedNFA(inputEvents, nfa, nfaState, AfterMatchSkipStrategy.noSkip());
- }
-
- public static List<List<Event>> feedNFA(
- List<StreamRecord<Event>> inputEvents,
- NFA<Event> nfa,
- AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
- return feedNFA(inputEvents, nfa, nfa.createInitialNFAState(), afterMatchSkipStrategy);
- }
-
+ @Deprecated
public static List<List<Event>> feedNFA(
List<StreamRecord<Event>> inputEvents,
- NFA<Event> nfa,
- NFAState nfaState,
- AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
- List<List<Event>> resultingPatterns = new ArrayList<>();
-
- SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-
- for (StreamRecord<Event> inputEvent : inputEvents) {
- try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
- nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
- Collection<Map<String, List<Event>>> patterns = nfa.process(
- sharedBufferAccessor,
- nfaState,
- inputEvent.getValue(),
- inputEvent.getTimestamp(),
- afterMatchSkipStrategy);
- for (Map<String, List<Event>> p: patterns) {
- List<Event> res = new ArrayList<>();
- for (List<Event> le: p.values()) {
- res.addAll(le);
- }
- resultingPatterns.add(res);
- }
- }
- }
- return resultingPatterns;
+ NFA<Event> nfa) throws Exception {
+ NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
+ return nfaTestHarness.feedRecords(inputEvents);
}
public static void compareMaps(List<List<Event>> actual, List<List<Event>> expected) {