You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2022/06/17 01:10:29 UTC

[flink] branch master updated: [FLINK-26941][cep] Support Pattern end with notFollowedBy in case time constraint is defined

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 84a0a9dd121 [FLINK-26941][cep] Support Pattern end with notFollowedBy in case time constraint is defined
84a0a9dd121 is described below

commit 84a0a9dd1213581a7e9e4e34f93cec0a70b4424c
Author: mayue.fight <ma...@bytedance.com>
AuthorDate: Thu Mar 31 14:53:29 2022 +0800

    [FLINK-26941][cep] Support Pattern end with notFollowedBy in case time constraint is defined
    
    This closes #19295.
---
 docs/content.zh/docs/libs/cep.md                   |  31 +++++-
 docs/content/docs/libs/cep.md                      |  31 +++++-
 .../main/java/org/apache/flink/cep/nfa/NFA.java    |  27 +++--
 .../main/java/org/apache/flink/cep/nfa/State.java  |   5 +
 .../apache/flink/cep/nfa/compiler/NFACompiler.java |  24 ++++-
 .../org/apache/flink/cep/operator/CepOperator.java |  14 ++-
 .../java/org/apache/flink/cep/nfa/NFAITCase.java   |  70 +++++++++++-
 .../flink/cep/nfa/NFAStatusChangeITCase.java       |   2 +-
 .../org/apache/flink/cep/nfa/NotPatternITCase.java | 117 +++++++++++++++++++++
 .../flink/cep/nfa/compiler/NFACompilerTest.java    |  55 +++++++++-
 .../org/apache/flink/cep/utils/NFATestHarness.java |  20 ++--
 11 files changed, 366 insertions(+), 30 deletions(-)

diff --git a/docs/content.zh/docs/libs/cep.md b/docs/content.zh/docs/libs/cep.md
index 0e1c970b804..73e50b9746f 100644
--- a/docs/content.zh/docs/libs/cep.md
+++ b/docs/content.zh/docs/libs/cep.md
@@ -621,7 +621,7 @@ val start : Pattern[Event, _] = Pattern.begin("start")
 2. `notFollowedBy()`,如果不想一个特定事件发生在两个事件之间的任何地方。
 
 {{< hint warning >}}
-模式序列不能以`notFollowedBy()`结尾。
+如果模式序列没有定义时间约束,则不能以 `notFollowedBy()` 结尾。
 {{< /hint >}}
 
 {{< hint warning >}}
@@ -701,6 +701,35 @@ next.within(Time.seconds(10))
 {{< /tab >}}
 {{< /tabs >}}
 
+注意定义过时间约束的模式允许以 `notFollowedBy()` 结尾。
+例如,可以定义如下的模式:
+
+{{< tabs "df27eb6d-c532-430a-b56f-98ad4082e6d5" >}}
+{{< tab "Java" >}}
+```java
+Pattern.<Event>begin("start")
+    .next("middle").where(new SimpleCondition<Event>() {
+    @Override
+    public boolean filter(Event value) throws Exception {
+        return value.getName().equals("a");
+    }
+}).notFollowedBy("end").where(new SimpleCondition<Event>() {
+    @Override
+    public boolean filter(Event value) throws Exception {
+        return value.getName().equals("b");
+    }
+}).within(Time.seconds(10));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+Pattern.begin("start").where(_.getName().equals("a"))
+.notFollowedBy("end").where(_.getName == "b")
+.within(Time.seconds(10))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 #### 循环模式中的连续性
 
 你可以在循环模式中使用和前面[章节](#组合模式)讲过的同样的连续性。
diff --git a/docs/content/docs/libs/cep.md b/docs/content/docs/libs/cep.md
index fc541e2afd9..c6bb7abf23a 100644
--- a/docs/content/docs/libs/cep.md
+++ b/docs/content/docs/libs/cep.md
@@ -636,7 +636,7 @@ or
 2. `notFollowedBy()`, if you do not want an event type to be anywhere between two other event types.
 
 {{< hint warning >}}
-A pattern sequence cannot end in `notFollowedBy()`.
+A pattern sequence cannot end with `notFollowedBy()` if the time interval is not defined via `withIn()`.
 {{< /hint >}}
 
 {{< hint warning >}}
@@ -718,6 +718,35 @@ next.within(Time.seconds(10))
 {{< /tab >}}
 {{< /tabs >}}
 
+Notice that a pattern sequence can end with `notFollowedBy()` with temporal constraint
+E.g. a pattern like:
+
+{{< tabs "df27eb6d-c532-430a-b56f-98ad4082e6d5" >}}
+{{< tab "Java" >}}
+```java
+Pattern.<Event>begin("start")
+    .next("middle").where(new SimpleCondition<Event>() {
+    @Override
+    public boolean filter(Event value) throws Exception {
+        return value.getName().equals("a");
+    }
+}).notFollowedBy("end").where(new SimpleCondition<Event>() {
+    @Override
+    public boolean filter(Event value) throws Exception {
+        return value.getName().equals("b");
+    }
+}).within(Time.seconds(10));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+Pattern.begin("start").where(_.getName().equals("a"))
+.notFollowedBy("end").where(_.getName == "b")
+.within(Time.seconds(10))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
 #### Contiguity within looping patterns
 
 You can apply the same contiguity condition as discussed in the previous [section](#combining-patterns) within a looping pattern.
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 1e8a90b6737..3f63a8f3e72 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -257,23 +257,30 @@ public class NFA<T> {
      * @param nfaState The NFAState object that we need to affect while processing
      * @param timestamp timestamp that indicates that there will be no more events with lower
      *     timestamp
-     * @return all timed outed partial matches
+     * @return all pending matches and timed outed partial matches
      * @throws Exception Thrown if the system cannot access the state.
      */
-    public Collection<Tuple2<Map<String, List<T>>, Long>> advanceTime(
-            final SharedBufferAccessor<T> sharedBufferAccessor,
-            final NFAState nfaState,
-            final long timestamp)
-            throws Exception {
-
+    public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>>
+            advanceTime(
+                    final SharedBufferAccessor<T> sharedBufferAccessor,
+                    final NFAState nfaState,
+                    final long timestamp)
+                    throws Exception {
+
+        final Collection<Map<String, List<T>>> pendingMatches = new ArrayList<>();
         final Collection<Tuple2<Map<String, List<T>>, Long>> timeoutResult = new ArrayList<>();
         final PriorityQueue<ComputationState> newPartialMatches =
                 new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
 
         for (ComputationState computationState : nfaState.getPartialMatches()) {
             if (isStateTimedOut(computationState, timestamp)) {
-
-                if (handleTimeout) {
+                if (getState(computationState).isPending()) {
+                    // extract the Pending State
+                    Map<String, List<T>> pendingPattern =
+                            sharedBufferAccessor.materializeMatch(
+                                    extractCurrentMatches(sharedBufferAccessor, computationState));
+                    pendingMatches.add(pendingPattern);
+                } else if (handleTimeout) {
                     // extract the timed out event pattern
                     Map<String, List<T>> timedOutPattern =
                             sharedBufferAccessor.materializeMatch(
@@ -297,7 +304,7 @@ public class NFA<T> {
 
         sharedBufferAccessor.advanceTime(timestamp);
 
-        return timeoutResult;
+        return Tuple2.of(pendingMatches, timeoutResult);
     }
 
     private boolean isStateTimedOut(final ComputationState state, final long timestamp) {
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 5d9754ade09..d611e3e61bb 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -135,11 +135,16 @@ public class State<T> implements Serializable {
         return stateType == StateType.Stop;
     }
 
+    public boolean isPending() {
+        return stateType == StateType.Pending;
+    }
+
     /** Set of valid state types. */
     public enum StateType {
         Start, // the state is a starting state for the NFA
         Final, // the state is a final state for the NFA
         Normal, // the state is neither a start nor a final state
+        Pending, // the state is pending and waiting for timeout handling
         Stop
     }
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 0e9ed184557..15b2a7cea0c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -157,11 +157,8 @@ public class NFACompiler {
          * create multiple NFAs.
          */
         void compileFactory() {
-            if (currentPattern.getQuantifier().getConsumingStrategy()
-                    == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
-                throw new MalformedPatternException(
-                        "NotFollowedBy is not supported as a last part of a Pattern!");
-            }
+
+            Pattern<T, ?> lastPattern = currentPattern;
 
             checkPatternNameUniqueness();
 
@@ -174,6 +171,13 @@ public class NFACompiler {
             sinkState = createMiddleStates(sinkState);
             // add the beginning state
             createStartState(sinkState);
+
+            if (lastPattern.getQuantifier().getConsumingStrategy()
+                            == Quantifier.ConsumingStrategy.NOT_FOLLOW
+                    && getWindowTime() == 0) {
+                throw new MalformedPatternException(
+                        "NotFollowedBy is not supported without windowTime as a last part of a Pattern!");
+            }
         }
 
         AfterMatchSkipStrategy getAfterMatchSkipStrategy() {
@@ -304,6 +308,16 @@ public class NFACompiler {
                 if (currentPattern.getQuantifier().getConsumingStrategy()
                         == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
                     // skip notFollow patterns, they are converted into edge conditions
+                    if (getWindowTime() > 0 && lastSink.isFinal()) {
+                        final State<T> notFollow =
+                                createState(currentPattern.getName(), State.StateType.Pending);
+                        final IterativeCondition<T> notCondition = getTakeCondition(currentPattern);
+                        final State<T> stopState =
+                                createStopState(notCondition, currentPattern.getName());
+                        notFollow.addProceed(stopState, notCondition);
+                        notFollow.addIgnore(new RichNotCondition<>(notCondition));
+                        lastSink = notFollow;
+                    }
                 } else if (currentPattern.getQuantifier().getConsumingStrategy()
                         == Quantifier.ConsumingStrategy.NOT_NEXT) {
                     final State<T> notNext =
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index d1cfd537bd9..b7ea2373a59 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -422,8 +422,18 @@ public class CepOperator<IN, KEY, OUT>
      */
     private void advanceTime(NFAState nfaState, long timestamp) throws Exception {
         try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
-            Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut =
-                    nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp);
+            Tuple2<
+                            Collection<Map<String, List<IN>>>,
+                            Collection<Tuple2<Map<String, List<IN>>, Long>>>
+                    pendingMatchesAndTimeout =
+                            nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp);
+
+            Collection<Map<String, List<IN>>> pendingMatches = pendingMatchesAndTimeout.f0;
+            Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut = pendingMatchesAndTimeout.f1;
+
+            if (!pendingMatches.isEmpty()) {
+                processMatchedSequences(pendingMatches, timestamp);
+            }
             if (!timedOut.isEmpty()) {
                 processTimedOutSequences(timedOut);
             }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 175014bbb67..4bd899da353 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -477,7 +477,7 @@ public class NFAITCase extends TestLogger {
         for (StreamRecord<Event> event : events) {
 
             Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutPatterns =
-                    nfa.advanceTime(sharedBufferAccessor, nfaState, event.getTimestamp());
+                    nfa.advanceTime(sharedBufferAccessor, nfaState, event.getTimestamp()).f1;
             Collection<Map<String, List<Event>>> matchedPatterns =
                     nfa.process(
                             sharedBufferAccessor,
@@ -497,6 +497,74 @@ public class NFAITCase extends TestLogger {
         assertEquals(expectedTimeoutPatterns, resultingTimeoutPatterns);
     }
 
+    @Test
+    public void testPendingStateMatches() throws Exception {
+        List<StreamRecord<Event>> events = new ArrayList<>();
+        Set<Map<String, List<Event>>> resultingPendingMatches = new HashSet<>();
+        Set<Map<String, List<Event>>> expectedPendingMatches = new HashSet<>();
+
+        events.add(new StreamRecord<>(new Event(1, "start", 1.0), 1));
+        events.add(new StreamRecord<>(new Event(2, "middle", 1.0), 4));
+        events.add(new StreamRecord<>(new Event(3, "start", 1.0), 5));
+        events.add(new StreamRecord<>(new Event(4, "start", 1.0), 11));
+        events.add(new StreamRecord<>(new Event(5, "middle", 1.0), 18));
+
+        Map<String, List<Event>> pendingMatches1 = new HashMap<>();
+        pendingMatches1.put("start", Collections.singletonList(new Event(3, "start", 1.0)));
+
+        Map<String, List<Event>> pendingMatches2 = new HashMap<>();
+        pendingMatches2.put("start", Collections.singletonList(new Event(4, "start", 1.0)));
+
+        expectedPendingMatches.add(pendingMatches1);
+        expectedPendingMatches.add(pendingMatches2);
+
+        Pattern<Event, ?> pattern =
+                Pattern.<Event>begin("start")
+                        .where(
+                                new SimpleCondition<Event>() {
+                                    private static final long serialVersionUID =
+                                            7907391379273505897L;
+
+                                    @Override
+                                    public boolean filter(Event value) throws Exception {
+                                        return value.getName().equals("start");
+                                    }
+                                })
+                        .notFollowedBy("middle")
+                        .where(
+                                new SimpleCondition<Event>() {
+                                    private static final long serialVersionUID =
+                                            -3268741540234334074L;
+
+                                    @Override
+                                    public boolean filter(Event value) throws Exception {
+                                        return value.getName().equals("middle");
+                                    }
+                                })
+                        .within(Time.milliseconds(5));
+
+        NFA<Event> nfa = compile(pattern, true);
+
+        NFAState nfaState = nfa.createInitialNFAState();
+
+        for (StreamRecord<Event> event : events) {
+            Collection<Map<String, List<Event>>> pendingMatches =
+                    nfa.advanceTime(sharedBufferAccessor, nfaState, event.getTimestamp()).f0;
+            resultingPendingMatches.addAll(pendingMatches);
+            nfa.process(
+                    sharedBufferAccessor,
+                    nfaState,
+                    event.getValue(),
+                    event.getTimestamp(),
+                    AfterMatchSkipStrategy.noSkip(),
+                    new TestTimerService());
+        }
+
+        assertEquals(2, resultingPendingMatches.size());
+        assertEquals(expectedPendingMatches.size(), resultingPendingMatches.size());
+        assertEquals(expectedPendingMatches, resultingPendingMatches);
+    }
+
     @Test
     public void testBranchingPattern() throws Exception {
         List<StreamRecord<Event>> inputEvents = new ArrayList<>();
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
index 1e9731d5b92..fe784d7b6a8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
@@ -210,7 +210,7 @@ public class NFAStatusChangeITCase {
         // be removed from eventSharedBuffer as the timeout happens
         nfaState.resetStateChanged();
         Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutResults =
-                nfa.advanceTime(sharedBufferAccessor, nfaState, 12L);
+                nfa.advanceTime(sharedBufferAccessor, nfaState, 12L).f1;
         assertTrue(
                 "NFA status should change as timeout happens",
                 nfaState.isStateChanged() && !timeoutResults.isEmpty());
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
index 1144511e112..0150c04acc0 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.cep.nfa;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
@@ -1509,4 +1510,120 @@ public class NotPatternITCase extends TestLogger {
 
         return feedNFA(inputEvents, nfa);
     }
+
+    @Test
+    public void testNotFollowedByWithInAtEnd() throws Exception {
+        List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+        Event a1 = new Event(40, "a", 1.0);
+        Event b1 = new Event(41, "b", 2.0);
+        Event a2 = new Event(42, "a", 3.0);
+        Event c = new Event(43, "c", 4.0);
+        Event b2 = new Event(44, "b", 5.0);
+        Event a3 = new Event(46, "a", 7.0);
+        Event b3 = new Event(47, "b", 8.0);
+
+        inputEvents.add(new StreamRecord<>(a1, 1));
+        inputEvents.add(new StreamRecord<>(b1, 2));
+        inputEvents.add(new StreamRecord<>(a2, 4));
+        inputEvents.add(new StreamRecord<>(c, 5));
+        inputEvents.add(new StreamRecord<>(b2, 10));
+        inputEvents.add(new StreamRecord<>(a3, 11));
+        inputEvents.add(new StreamRecord<>(b3, 13));
+
+        Pattern<Event, ?> pattern =
+                Pattern.<Event>begin("a")
+                        .where(
+                                new SimpleCondition<Event>() {
+
+                                    @Override
+                                    public boolean filter(Event value) throws Exception {
+                                        return value.getName().equals("a");
+                                    }
+                                })
+                        .notFollowedBy("b")
+                        .where(
+                                new SimpleCondition<Event>() {
+
+                                    @Override
+                                    public boolean filter(Event value) throws Exception {
+                                        return value.getName().equals("b");
+                                    }
+                                })
+                        .within(Time.milliseconds(3));
+
+        NFA<Event> nfa = compile(pattern, false);
+
+        final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+        comparePatterns(matches, Lists.<List<Event>>newArrayList(Lists.newArrayList(a2)));
+    }
+
+    @Test
+    public void testNotFollowByBeforeTimesWithIn() throws Exception {
+        List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+        Event a1 = new Event(40, "a", 1.0);
+        Event b1 = new Event(41, "b", 2.0);
+        Event a2 = new Event(42, "a", 3.0);
+        Event c1 = new Event(43, "c", 4.0);
+        Event c2 = new Event(44, "c", 5.0);
+        Event a3 = new Event(46, "a", 7.0);
+        Event c3 = new Event(47, "c", 8.0);
+        Event c4 = new Event(47, "c", 8.0);
+
+        inputEvents.add(new StreamRecord<>(a1, 1));
+        inputEvents.add(new StreamRecord<>(b1, 2));
+        inputEvents.add(new StreamRecord<>(a2, 10));
+        inputEvents.add(new StreamRecord<>(c1, 11));
+        inputEvents.add(new StreamRecord<>(c2, 12));
+        inputEvents.add(new StreamRecord<>(a3, 20));
+        inputEvents.add(new StreamRecord<>(c3, 21));
+        inputEvents.add(new StreamRecord<>(c4, 24));
+
+        Pattern<Event, ?> pattern =
+                Pattern.<Event>begin("a")
+                        .where(
+                                new SimpleCondition<Event>() {
+
+                                    @Override
+                                    public boolean filter(Event value) throws Exception {
+                                        return value.getName().equals("a");
+                                    }
+                                })
+                        .notFollowedBy("b")
+                        .where(
+                                new SimpleCondition<Event>() {
+
+                                    @Override
+                                    public boolean filter(Event value) throws Exception {
+                                        return value.getName().equals("b");
+                                    }
+                                })
+                        .followedBy("c")
+                        .where(
+                                new SimpleCondition<Event>() {
+
+                                    @Override
+                                    public boolean filter(Event value) throws Exception {
+                                        return value.getName().equals("c");
+                                    }
+                                })
+                        .times(0, 2)
+                        .within(Time.milliseconds(3));
+
+        NFA<Event> nfa = compile(pattern, false);
+
+        final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+        comparePatterns(
+                matches,
+                Lists.<List<Event>>newArrayList(
+                        Lists.newArrayList(a1),
+                        Lists.newArrayList(a2),
+                        Lists.newArrayList(a2, c1),
+                        Lists.newArrayList(a2, c1, c2),
+                        Lists.newArrayList(a3),
+                        Lists.newArrayList(a3, c4)));
+    }
 }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index 0f59106e904..9a9a7595380 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -100,7 +100,7 @@ public class NFACompilerTest extends TestLogger {
         // adjust the rule
         expectedException.expect(MalformedPatternException.class);
         expectedException.expectMessage(
-                "NotFollowedBy is not supported as a last part of a Pattern!");
+                "NotFollowedBy is not supported without windowTime as a last part of a Pattern!");
 
         Pattern<Event, ?> invalidPattern =
                 Pattern.<Event>begin("start")
@@ -182,6 +182,59 @@ public class NFACompilerTest extends TestLogger {
         assertEquals(0, endingState.getStateTransitions().size());
     }
 
+    @Test
+    public void testNFACompilerPatternNotFollowedByWithIn() {
+        Pattern<Event, Event> pattern =
+                Pattern.<Event>begin("start")
+                        .where(startFilter)
+                        .notFollowedBy("middle")
+                        .where(endFilter)
+                        .within(Time.milliseconds(1));
+
+        NFA<Event> nfa = compile(pattern, false);
+
+        Collection<State<Event>> states = nfa.getStates();
+        assertEquals(4, states.size());
+
+        Map<String, State<Event>> stateMap = new HashMap<>();
+        for (State<Event> state : states) {
+            stateMap.put(state.getName(), state);
+        }
+
+        assertTrue(stateMap.containsKey("start"));
+        State<Event> startState = stateMap.get("start");
+        assertTrue(startState.isStart());
+        final Set<Tuple2<String, StateTransitionAction>> startTransitions =
+                unfoldTransitions(startState);
+        assertEquals(
+                Sets.newHashSet(Tuple2.of("middle", StateTransitionAction.TAKE)), startTransitions);
+
+        assertTrue(stateMap.containsKey("middle"));
+        State<Event> middleState = stateMap.get("middle");
+        assertTrue(middleState.isPending());
+        final Set<Tuple2<String, StateTransitionAction>> middleTransitions =
+                unfoldTransitions(middleState);
+        assertEquals(
+                Sets.newHashSet(
+                        Tuple2.of("middle", StateTransitionAction.IGNORE),
+                        Tuple2.of("middle:0", StateTransitionAction.PROCEED)),
+                middleTransitions);
+
+        assertTrue(stateMap.containsKey("middle:0"));
+        State<Event> middle0State = stateMap.get("middle:0");
+        assertTrue(middle0State.isStop());
+        final Set<Tuple2<String, StateTransitionAction>> middle0Transitions =
+                unfoldTransitions(middle0State);
+        assertEquals(
+                Sets.newHashSet(Tuple2.of("middle:0", StateTransitionAction.TAKE)),
+                middle0Transitions);
+
+        assertTrue(stateMap.containsKey(NFACompiler.ENDING_STATE_NAME));
+        State<Event> endingState = stateMap.get(NFACompiler.ENDING_STATE_NAME);
+        assertTrue(endingState.isFinal());
+        assertEquals(0, endingState.getStateTransitions().size());
+    }
+
     @Test
     public void testNoUnnecessaryStateCopiesCreated() {
         final Pattern<Event, Event> pattern =
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
index 1cba9ff39f0..bc1a51cc5e2 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
@@ -99,14 +99,18 @@ public final class NFATestHarness {
     public Collection<Map<String, List<Event>>> consumeRecord(StreamRecord<Event> inputEvent)
             throws Exception {
         try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
-            nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
-            return nfa.process(
-                    sharedBufferAccessor,
-                    nfaState,
-                    inputEvent.getValue(),
-                    inputEvent.getTimestamp(),
-                    afterMatchSkipStrategy,
-                    timerService);
+            Collection<Map<String, List<Event>>> pendingMatches =
+                    nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp()).f0;
+            Collection<Map<String, List<Event>>> matchedPatterns =
+                    nfa.process(
+                            sharedBufferAccessor,
+                            nfaState,
+                            inputEvent.getValue(),
+                            inputEvent.getTimestamp(),
+                            afterMatchSkipStrategy,
+                            timerService);
+            matchedPatterns.addAll(pendingMatches);
+            return matchedPatterns;
         }
     }