You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/06/17 01:10:29 UTC
[flink] branch master updated: [FLINK-26941][cep] Support Pattern end with notFollowedBy in case time constraint is defined
This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 84a0a9dd121 [FLINK-26941][cep] Support Pattern end with notFollowedBy in case time constraint is defined
84a0a9dd121 is described below
commit 84a0a9dd1213581a7e9e4e34f93cec0a70b4424c
Author: mayue.fight <ma...@bytedance.com>
AuthorDate: Thu Mar 31 14:53:29 2022 +0800
[FLINK-26941][cep] Support Pattern end with notFollowedBy in case time constraint is defined
This closes #19295.
---
docs/content.zh/docs/libs/cep.md | 31 +++++-
docs/content/docs/libs/cep.md | 31 +++++-
.../main/java/org/apache/flink/cep/nfa/NFA.java | 27 +++--
.../main/java/org/apache/flink/cep/nfa/State.java | 5 +
.../apache/flink/cep/nfa/compiler/NFACompiler.java | 24 ++++-
.../org/apache/flink/cep/operator/CepOperator.java | 14 ++-
.../java/org/apache/flink/cep/nfa/NFAITCase.java | 70 +++++++++++-
.../flink/cep/nfa/NFAStatusChangeITCase.java | 2 +-
.../org/apache/flink/cep/nfa/NotPatternITCase.java | 117 +++++++++++++++++++++
.../flink/cep/nfa/compiler/NFACompilerTest.java | 55 +++++++++-
.../org/apache/flink/cep/utils/NFATestHarness.java | 20 ++--
11 files changed, 366 insertions(+), 30 deletions(-)
diff --git a/docs/content.zh/docs/libs/cep.md b/docs/content.zh/docs/libs/cep.md
index 0e1c970b804..73e50b9746f 100644
--- a/docs/content.zh/docs/libs/cep.md
+++ b/docs/content.zh/docs/libs/cep.md
@@ -621,7 +621,7 @@ val start : Pattern[Event, _] = Pattern.begin("start")
2. `notFollowedBy()`,如果不想一个特定事件发生在两个事件之间的任何地方。
{{< hint warning >}}
-模式序列不能以`notFollowedBy()`结尾。
+如果模式序列没有定义时间约束,则不能以 `notFollowedBy()` 结尾。
{{< /hint >}}
{{< hint warning >}}
@@ -701,6 +701,35 @@ next.within(Time.seconds(10))
{{< /tab >}}
{{< /tabs >}}
+注意定义过时间约束的模式允许以 `notFollowedBy()` 结尾。
+例如,可以定义如下的模式:
+
+{{< tabs "df27eb6d-c532-430a-b56f-98ad4082e6d5" >}}
+{{< tab "Java" >}}
+```java
+Pattern.<Event>begin("start")
+ .next("middle").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+}).notFollowedBy("end").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+}).within(Time.seconds(10));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+Pattern.begin("start").where(_.getName().equals("a"))
+.notFollowedBy("end").where(_.getName == "b")
+.within(Time.seconds(10))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
#### 循环模式中的连续性
你可以在循环模式中使用和前面[章节](#组合模式)讲过的同样的连续性。
diff --git a/docs/content/docs/libs/cep.md b/docs/content/docs/libs/cep.md
index fc541e2afd9..c6bb7abf23a 100644
--- a/docs/content/docs/libs/cep.md
+++ b/docs/content/docs/libs/cep.md
@@ -636,7 +636,7 @@ or
2. `notFollowedBy()`, if you do not want an event type to be anywhere between two other event types.
{{< hint warning >}}
-A pattern sequence cannot end in `notFollowedBy()`.
+A pattern sequence cannot end with `notFollowedBy()` if the time interval is not defined via `withIn()`.
{{< /hint >}}
{{< hint warning >}}
@@ -718,6 +718,35 @@ next.within(Time.seconds(10))
{{< /tab >}}
{{< /tabs >}}
+Notice that a pattern sequence can end with `notFollowedBy()` with temporal constraint
+E.g. a pattern like:
+
+{{< tabs "df27eb6d-c532-430a-b56f-98ad4082e6d5" >}}
+{{< tab "Java" >}}
+```java
+Pattern.<Event>begin("start")
+ .next("middle").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+}).notFollowedBy("end").where(new SimpleCondition<Event>() {
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+}).within(Time.seconds(10));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+Pattern.begin("start").where(_.getName().equals("a"))
+.notFollowedBy("end").where(_.getName == "b")
+.within(Time.seconds(10))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
#### Contiguity within looping patterns
You can apply the same contiguity condition as discussed in the previous [section](#combining-patterns) within a looping pattern.
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 1e8a90b6737..3f63a8f3e72 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -257,23 +257,30 @@ public class NFA<T> {
* @param nfaState The NFAState object that we need to affect while processing
* @param timestamp timestamp that indicates that there will be no more events with lower
* timestamp
- * @return all timed outed partial matches
+ * @return all pending matches and timed outed partial matches
* @throws Exception Thrown if the system cannot access the state.
*/
- public Collection<Tuple2<Map<String, List<T>>, Long>> advanceTime(
- final SharedBufferAccessor<T> sharedBufferAccessor,
- final NFAState nfaState,
- final long timestamp)
- throws Exception {
-
+ public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>>
+ advanceTime(
+ final SharedBufferAccessor<T> sharedBufferAccessor,
+ final NFAState nfaState,
+ final long timestamp)
+ throws Exception {
+
+ final Collection<Map<String, List<T>>> pendingMatches = new ArrayList<>();
final Collection<Tuple2<Map<String, List<T>>, Long>> timeoutResult = new ArrayList<>();
final PriorityQueue<ComputationState> newPartialMatches =
new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
for (ComputationState computationState : nfaState.getPartialMatches()) {
if (isStateTimedOut(computationState, timestamp)) {
-
- if (handleTimeout) {
+ if (getState(computationState).isPending()) {
+ // extract the Pending State
+ Map<String, List<T>> pendingPattern =
+ sharedBufferAccessor.materializeMatch(
+ extractCurrentMatches(sharedBufferAccessor, computationState));
+ pendingMatches.add(pendingPattern);
+ } else if (handleTimeout) {
// extract the timed out event pattern
Map<String, List<T>> timedOutPattern =
sharedBufferAccessor.materializeMatch(
@@ -297,7 +304,7 @@ public class NFA<T> {
sharedBufferAccessor.advanceTime(timestamp);
- return timeoutResult;
+ return Tuple2.of(pendingMatches, timeoutResult);
}
private boolean isStateTimedOut(final ComputationState state, final long timestamp) {
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 5d9754ade09..d611e3e61bb 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -135,11 +135,16 @@ public class State<T> implements Serializable {
return stateType == StateType.Stop;
}
+ public boolean isPending() {
+ return stateType == StateType.Pending;
+ }
+
/** Set of valid state types. */
public enum StateType {
Start, // the state is a starting state for the NFA
Final, // the state is a final state for the NFA
Normal, // the state is neither a start nor a final state
+ Pending, // the state is pending and waiting for timeout handling
Stop
}
}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 0e9ed184557..15b2a7cea0c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -157,11 +157,8 @@ public class NFACompiler {
* create multiple NFAs.
*/
void compileFactory() {
- if (currentPattern.getQuantifier().getConsumingStrategy()
- == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
- throw new MalformedPatternException(
- "NotFollowedBy is not supported as a last part of a Pattern!");
- }
+
+ Pattern<T, ?> lastPattern = currentPattern;
checkPatternNameUniqueness();
@@ -174,6 +171,13 @@ public class NFACompiler {
sinkState = createMiddleStates(sinkState);
// add the beginning state
createStartState(sinkState);
+
+ if (lastPattern.getQuantifier().getConsumingStrategy()
+ == Quantifier.ConsumingStrategy.NOT_FOLLOW
+ && getWindowTime() == 0) {
+ throw new MalformedPatternException(
+ "NotFollowedBy is not supported without windowTime as a last part of a Pattern!");
+ }
}
AfterMatchSkipStrategy getAfterMatchSkipStrategy() {
@@ -304,6 +308,16 @@ public class NFACompiler {
if (currentPattern.getQuantifier().getConsumingStrategy()
== Quantifier.ConsumingStrategy.NOT_FOLLOW) {
// skip notFollow patterns, they are converted into edge conditions
+ if (getWindowTime() > 0 && lastSink.isFinal()) {
+ final State<T> notFollow =
+ createState(currentPattern.getName(), State.StateType.Pending);
+ final IterativeCondition<T> notCondition = getTakeCondition(currentPattern);
+ final State<T> stopState =
+ createStopState(notCondition, currentPattern.getName());
+ notFollow.addProceed(stopState, notCondition);
+ notFollow.addIgnore(new RichNotCondition<>(notCondition));
+ lastSink = notFollow;
+ }
} else if (currentPattern.getQuantifier().getConsumingStrategy()
== Quantifier.ConsumingStrategy.NOT_NEXT) {
final State<T> notNext =
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index d1cfd537bd9..b7ea2373a59 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -422,8 +422,18 @@ public class CepOperator<IN, KEY, OUT>
*/
private void advanceTime(NFAState nfaState, long timestamp) throws Exception {
try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
- Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut =
- nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp);
+ Tuple2<
+ Collection<Map<String, List<IN>>>,
+ Collection<Tuple2<Map<String, List<IN>>, Long>>>
+ pendingMatchesAndTimeout =
+ nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp);
+
+ Collection<Map<String, List<IN>>> pendingMatches = pendingMatchesAndTimeout.f0;
+ Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut = pendingMatchesAndTimeout.f1;
+
+ if (!pendingMatches.isEmpty()) {
+ processMatchedSequences(pendingMatches, timestamp);
+ }
if (!timedOut.isEmpty()) {
processTimedOutSequences(timedOut);
}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 175014bbb67..4bd899da353 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -477,7 +477,7 @@ public class NFAITCase extends TestLogger {
for (StreamRecord<Event> event : events) {
Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutPatterns =
- nfa.advanceTime(sharedBufferAccessor, nfaState, event.getTimestamp());
+ nfa.advanceTime(sharedBufferAccessor, nfaState, event.getTimestamp()).f1;
Collection<Map<String, List<Event>>> matchedPatterns =
nfa.process(
sharedBufferAccessor,
@@ -497,6 +497,74 @@ public class NFAITCase extends TestLogger {
assertEquals(expectedTimeoutPatterns, resultingTimeoutPatterns);
}
+ @Test
+ public void testPendingStateMatches() throws Exception {
+ List<StreamRecord<Event>> events = new ArrayList<>();
+ Set<Map<String, List<Event>>> resultingPendingMatches = new HashSet<>();
+ Set<Map<String, List<Event>>> expectedPendingMatches = new HashSet<>();
+
+ events.add(new StreamRecord<>(new Event(1, "start", 1.0), 1));
+ events.add(new StreamRecord<>(new Event(2, "middle", 1.0), 4));
+ events.add(new StreamRecord<>(new Event(3, "start", 1.0), 5));
+ events.add(new StreamRecord<>(new Event(4, "start", 1.0), 11));
+ events.add(new StreamRecord<>(new Event(5, "middle", 1.0), 18));
+
+ Map<String, List<Event>> pendingMatches1 = new HashMap<>();
+ pendingMatches1.put("start", Collections.singletonList(new Event(3, "start", 1.0)));
+
+ Map<String, List<Event>> pendingMatches2 = new HashMap<>();
+ pendingMatches2.put("start", Collections.singletonList(new Event(4, "start", 1.0)));
+
+ expectedPendingMatches.add(pendingMatches1);
+ expectedPendingMatches.add(pendingMatches2);
+
+ Pattern<Event, ?> pattern =
+ Pattern.<Event>begin("start")
+ .where(
+ new SimpleCondition<Event>() {
+ private static final long serialVersionUID =
+ 7907391379273505897L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("start");
+ }
+ })
+ .notFollowedBy("middle")
+ .where(
+ new SimpleCondition<Event>() {
+ private static final long serialVersionUID =
+ -3268741540234334074L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("middle");
+ }
+ })
+ .within(Time.milliseconds(5));
+
+ NFA<Event> nfa = compile(pattern, true);
+
+ NFAState nfaState = nfa.createInitialNFAState();
+
+ for (StreamRecord<Event> event : events) {
+ Collection<Map<String, List<Event>>> pendingMatches =
+ nfa.advanceTime(sharedBufferAccessor, nfaState, event.getTimestamp()).f0;
+ resultingPendingMatches.addAll(pendingMatches);
+ nfa.process(
+ sharedBufferAccessor,
+ nfaState,
+ event.getValue(),
+ event.getTimestamp(),
+ AfterMatchSkipStrategy.noSkip(),
+ new TestTimerService());
+ }
+
+ assertEquals(2, resultingPendingMatches.size());
+ assertEquals(expectedPendingMatches.size(), resultingPendingMatches.size());
+ assertEquals(expectedPendingMatches, resultingPendingMatches);
+ }
+
@Test
public void testBranchingPattern() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
index 1e9731d5b92..fe784d7b6a8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
@@ -210,7 +210,7 @@ public class NFAStatusChangeITCase {
// be removed from eventSharedBuffer as the timeout happens
nfaState.resetStateChanged();
Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutResults =
- nfa.advanceTime(sharedBufferAccessor, nfaState, 12L);
+ nfa.advanceTime(sharedBufferAccessor, nfaState, 12L).f1;
assertTrue(
"NFA status should change as timeout happens",
nfaState.isStateChanged() && !timeoutResults.isEmpty());
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
index 1144511e112..0150c04acc0 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.cep.nfa;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.TestLogger;
@@ -1509,4 +1510,120 @@ public class NotPatternITCase extends TestLogger {
return feedNFA(inputEvents, nfa);
}
+
+ @Test
+ public void testNotFollowedByWithInAtEnd() throws Exception {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event b1 = new Event(41, "b", 2.0);
+ Event a2 = new Event(42, "a", 3.0);
+ Event c = new Event(43, "c", 4.0);
+ Event b2 = new Event(44, "b", 5.0);
+ Event a3 = new Event(46, "a", 7.0);
+ Event b3 = new Event(47, "b", 8.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(b1, 2));
+ inputEvents.add(new StreamRecord<>(a2, 4));
+ inputEvents.add(new StreamRecord<>(c, 5));
+ inputEvents.add(new StreamRecord<>(b2, 10));
+ inputEvents.add(new StreamRecord<>(a3, 11));
+ inputEvents.add(new StreamRecord<>(b3, 13));
+
+ Pattern<Event, ?> pattern =
+ Pattern.<Event>begin("a")
+ .where(
+ new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ })
+ .notFollowedBy("b")
+ .where(
+ new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ })
+ .within(Time.milliseconds(3));
+
+ NFA<Event> nfa = compile(pattern, false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ comparePatterns(matches, Lists.<List<Event>>newArrayList(Lists.newArrayList(a2)));
+ }
+
+ @Test
+ public void testNotFollowByBeforeTimesWithIn() throws Exception {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a1 = new Event(40, "a", 1.0);
+ Event b1 = new Event(41, "b", 2.0);
+ Event a2 = new Event(42, "a", 3.0);
+ Event c1 = new Event(43, "c", 4.0);
+ Event c2 = new Event(44, "c", 5.0);
+ Event a3 = new Event(46, "a", 7.0);
+ Event c3 = new Event(47, "c", 8.0);
+ Event c4 = new Event(47, "c", 8.0);
+
+ inputEvents.add(new StreamRecord<>(a1, 1));
+ inputEvents.add(new StreamRecord<>(b1, 2));
+ inputEvents.add(new StreamRecord<>(a2, 10));
+ inputEvents.add(new StreamRecord<>(c1, 11));
+ inputEvents.add(new StreamRecord<>(c2, 12));
+ inputEvents.add(new StreamRecord<>(a3, 20));
+ inputEvents.add(new StreamRecord<>(c3, 21));
+ inputEvents.add(new StreamRecord<>(c4, 24));
+
+ Pattern<Event, ?> pattern =
+ Pattern.<Event>begin("a")
+ .where(
+ new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ })
+ .notFollowedBy("b")
+ .where(
+ new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ })
+ .followedBy("c")
+ .where(
+ new SimpleCondition<Event>() {
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ })
+ .times(0, 2)
+ .within(Time.milliseconds(3));
+
+ NFA<Event> nfa = compile(pattern, false);
+
+ final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+ comparePatterns(
+ matches,
+ Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a1),
+ Lists.newArrayList(a2),
+ Lists.newArrayList(a2, c1),
+ Lists.newArrayList(a2, c1, c2),
+ Lists.newArrayList(a3),
+ Lists.newArrayList(a3, c4)));
+ }
}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index 0f59106e904..9a9a7595380 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -100,7 +100,7 @@ public class NFACompilerTest extends TestLogger {
// adjust the rule
expectedException.expect(MalformedPatternException.class);
expectedException.expectMessage(
- "NotFollowedBy is not supported as a last part of a Pattern!");
+ "NotFollowedBy is not supported without windowTime as a last part of a Pattern!");
Pattern<Event, ?> invalidPattern =
Pattern.<Event>begin("start")
@@ -182,6 +182,59 @@ public class NFACompilerTest extends TestLogger {
assertEquals(0, endingState.getStateTransitions().size());
}
+ @Test
+ public void testNFACompilerPatternNotFollowedByWithIn() {
+ Pattern<Event, Event> pattern =
+ Pattern.<Event>begin("start")
+ .where(startFilter)
+ .notFollowedBy("middle")
+ .where(endFilter)
+ .within(Time.milliseconds(1));
+
+ NFA<Event> nfa = compile(pattern, false);
+
+ Collection<State<Event>> states = nfa.getStates();
+ assertEquals(4, states.size());
+
+ Map<String, State<Event>> stateMap = new HashMap<>();
+ for (State<Event> state : states) {
+ stateMap.put(state.getName(), state);
+ }
+
+ assertTrue(stateMap.containsKey("start"));
+ State<Event> startState = stateMap.get("start");
+ assertTrue(startState.isStart());
+ final Set<Tuple2<String, StateTransitionAction>> startTransitions =
+ unfoldTransitions(startState);
+ assertEquals(
+ Sets.newHashSet(Tuple2.of("middle", StateTransitionAction.TAKE)), startTransitions);
+
+ assertTrue(stateMap.containsKey("middle"));
+ State<Event> middleState = stateMap.get("middle");
+ assertTrue(middleState.isPending());
+ final Set<Tuple2<String, StateTransitionAction>> middleTransitions =
+ unfoldTransitions(middleState);
+ assertEquals(
+ Sets.newHashSet(
+ Tuple2.of("middle", StateTransitionAction.IGNORE),
+ Tuple2.of("middle:0", StateTransitionAction.PROCEED)),
+ middleTransitions);
+
+ assertTrue(stateMap.containsKey("middle:0"));
+ State<Event> middle0State = stateMap.get("middle:0");
+ assertTrue(middle0State.isStop());
+ final Set<Tuple2<String, StateTransitionAction>> middle0Transitions =
+ unfoldTransitions(middle0State);
+ assertEquals(
+ Sets.newHashSet(Tuple2.of("middle:0", StateTransitionAction.TAKE)),
+ middle0Transitions);
+
+ assertTrue(stateMap.containsKey(NFACompiler.ENDING_STATE_NAME));
+ State<Event> endingState = stateMap.get(NFACompiler.ENDING_STATE_NAME);
+ assertTrue(endingState.isFinal());
+ assertEquals(0, endingState.getStateTransitions().size());
+ }
+
@Test
public void testNoUnnecessaryStateCopiesCreated() {
final Pattern<Event, Event> pattern =
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
index 1cba9ff39f0..bc1a51cc5e2 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
@@ -99,14 +99,18 @@ public final class NFATestHarness {
public Collection<Map<String, List<Event>>> consumeRecord(StreamRecord<Event> inputEvent)
throws Exception {
try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
- nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
- return nfa.process(
- sharedBufferAccessor,
- nfaState,
- inputEvent.getValue(),
- inputEvent.getTimestamp(),
- afterMatchSkipStrategy,
- timerService);
+ Collection<Map<String, List<Event>>> pendingMatches =
+ nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp()).f0;
+ Collection<Map<String, List<Event>>> matchedPatterns =
+ nfa.process(
+ sharedBufferAccessor,
+ nfaState,
+ inputEvent.getValue(),
+ inputEvent.getTimestamp(),
+ afterMatchSkipStrategy,
+ timerService);
+ matchedPatterns.addAll(pendingMatches);
+ return matchedPatterns;
}
}