You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2017/07/25 12:33:39 UTC
flink git commit: [FLINK-7170] [cep] Fix until condition when the
contiguity is strict
Repository: flink
Updated Branches:
refs/heads/master b13914799 -> 449c84b0e
[FLINK-7170] [cep] Fix until condition when the contiguity is strict
This closes #4318.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/449c84b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/449c84b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/449c84b0
Branch: refs/heads/master
Commit: 449c84b0e21c6cfe43ab9ed697a69fa09308b381
Parents: b139147
Author: Dian Fu <fu...@alibaba-inc.com>
Authored: Thu Jul 13 17:21:30 2017 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Tue Jul 25 14:30:59 2017 +0200
----------------------------------------------------------------------
.../flink/cep/nfa/compiler/NFACompiler.java | 30 ++++++++-----
.../flink/cep/nfa/UntilConditionITCase.java | 47 ++++++++++++++++++++
2 files changed, 67 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/449c84b0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index e160e4a..62464d1 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -630,10 +630,12 @@ public class NFACompiler {
final IterativeCondition<T> ignoreCondition = extendWithUntilCondition(
getInnerIgnoreCondition(currentPattern),
- untilCondition);
+ untilCondition,
+ false);
final IterativeCondition<T> takeCondition = extendWithUntilCondition(
getTakeCondition(currentPattern),
- untilCondition);
+ untilCondition,
+ true);
final IterativeCondition<T> proceedCondition = getTrueFunction();
final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal);
@@ -664,7 +666,8 @@ public class NFACompiler {
private State<T> createInitMandatoryStateOfOneOrMore(final State<T> sinkState) {
final IterativeCondition<T> takeCondition = extendWithUntilCondition(
getTakeCondition(currentPattern),
- (IterativeCondition<T>) currentPattern.getUntilCondition()
+ (IterativeCondition<T>) currentPattern.getUntilCondition(),
+ true
);
final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
@@ -683,7 +686,8 @@ public class NFACompiler {
private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState, final State<T> lastSink) {
final IterativeCondition<T> takeCondition = extendWithUntilCondition(
getTakeCondition(currentPattern),
- (IterativeCondition<T>) currentPattern.getUntilCondition()
+ (IterativeCondition<T>) currentPattern.getUntilCondition(),
+ true
);
final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
@@ -697,14 +701,16 @@ public class NFACompiler {
*
* @param condition the condition to extend
* @param untilCondition the until condition to join with the given condition
+ * @param isTakeCondition whether the {@code condition} is for {@code TAKE} edge
* @return condition with AND applied or the original condition
*/
private IterativeCondition<T> extendWithUntilCondition(
IterativeCondition<T> condition,
- IterativeCondition<T> untilCondition) {
+ IterativeCondition<T> untilCondition,
+ boolean isTakeCondition) {
if (untilCondition != null && condition != null) {
return new AndCondition<>(new NotCondition<>(untilCondition), condition);
- } else if (untilCondition != null) {
+ } else if (untilCondition != null && isTakeCondition) {
return new NotCondition<>(untilCondition);
}
@@ -741,7 +747,8 @@ public class NFACompiler {
if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
innerIgnoreCondition = extendWithUntilCondition(
innerIgnoreCondition,
- (IterativeCondition<T>) currentGroupPattern.getUntilCondition());
+ (IterativeCondition<T>) currentGroupPattern.getUntilCondition(),
+ false);
}
return innerIgnoreCondition;
}
@@ -781,7 +788,8 @@ public class NFACompiler {
if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
ignoreCondition = extendWithUntilCondition(
ignoreCondition,
- (IterativeCondition<T>) currentGroupPattern.getUntilCondition());
+ (IterativeCondition<T>) currentGroupPattern.getUntilCondition(),
+ false);
}
return ignoreCondition;
}
@@ -797,7 +805,8 @@ public class NFACompiler {
if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
takeCondition = extendWithUntilCondition(
takeCondition,
- (IterativeCondition<T>) currentGroupPattern.getUntilCondition());
+ (IterativeCondition<T>) currentGroupPattern.getUntilCondition(),
+ true);
}
return takeCondition;
}
@@ -811,7 +820,8 @@ public class NFACompiler {
if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
trueCondition = extendWithUntilCondition(
trueCondition,
- (IterativeCondition<T>) currentGroupPattern.getUntilCondition());
+ (IterativeCondition<T>) currentGroupPattern.getUntilCondition(),
+ true);
}
return trueCondition;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/449c84b0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
index d56e883..639541d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
@@ -194,6 +194,53 @@ public class UntilConditionITCase {
}
@Test
+ public void testUntilConditionFollowedByOneOrMoreConsecutive2() throws Exception {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "b", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+ Event breaking = new Event(45, "a", 5.0);
+ Event ignored = new Event(46, "a", 6.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+ inputEvents.add(new StreamRecord<>(breaking, 7));
+ inputEvents.add(new StreamRecord<>(ignored, 8));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).followedBy("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).oneOrMore().consecutive().until(UNTIL_CONDITION)
+ .followedBy("end").where(
+ UNTIL_CONDITION
+ );
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent, middleEvent1, breaking)
+ ));
+ assertTrue(nfa.isEmpty());
+ }
+
+ @Test
public void testUntilConditionFollowedByZeroOrMore() throws Exception {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();