You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2017/07/05 09:56:30 UTC
[3/3] flink git commit: [FLINK-7061] [cep] Fix quantifier range
starting from 0
[FLINK-7061] [cep] Fix quantifier range starting from 0
This closes #4242
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3fc96cd1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3fc96cd1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3fc96cd1
Branch: refs/heads/master
Commit: 3fc96cd1f9564a60ba5ec7f06a1fec4ab173b200
Parents: 3096bd0
Author: Dian Fu <fu...@alibaba-inc.com>
Authored: Sun Jul 2 13:11:05 2017 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Wed Jul 5 11:53:59 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/cep/pattern/Pattern.java | 1 +
.../apache/flink/cep/pattern/Quantifier.java | 3 +-
.../apache/flink/cep/nfa/TimesRangeITCase.java | 51 ++++++++++++++++++++
.../apache/flink/cep/pattern/PatternTest.java | 10 ++++
4 files changed, 63 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index f4d3404..2ffbc41 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -368,6 +368,7 @@ public class Pattern<T, F extends T> {
this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
if (from == 0) {
this.quantifier.optional();
+ from = 1;
}
this.times = Times.of(from, to);
return this;
http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index c1893b4..9192a13 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -153,9 +153,8 @@ public class Quantifier {
private final int to;
private Times(int from, int to) {
- Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0.");
+ Preconditions.checkArgument(from > 0, "The from should be a positive number greater than 0.");
Preconditions.checkArgument(to >= from, "The to should be a number greater than or equal to from: " + from + ".");
- Preconditions.checkArgument(from != to || from != 0, "The from and to should not be both equal to 0.");
this.from = from;
this.to = to;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
index 4305fa2..37a9534 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
@@ -92,6 +92,57 @@ public class TimesRangeITCase extends TestLogger {
}
@Test
+ public void testTimesRangeFromZero() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event startEvent = new Event(40, "c", 1.0);
+ Event middleEvent1 = new Event(41, "a", 2.0);
+ Event middleEvent2 = new Event(42, "a", 3.0);
+ Event middleEvent3 = new Event(43, "a", 4.0);
+ Event end1 = new Event(44, "b", 5.0);
+
+ inputEvents.add(new StreamRecord<>(startEvent, 1));
+ inputEvents.add(new StreamRecord<>(middleEvent1, 2));
+ inputEvents.add(new StreamRecord<>(middleEvent2, 3));
+ inputEvents.add(new StreamRecord<>(middleEvent3, 4));
+ inputEvents.add(new StreamRecord<>(end1, 6));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("c");
+ }
+ }).next("middle").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("a");
+ }
+ }).times(0, 2).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
+ private static final long serialVersionUID = 5726188262756267490L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ return value.getName().equals("b");
+ }
+ });
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
+ Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1),
+ Lists.newArrayList(startEvent, middleEvent1, end1),
+ Lists.newArrayList(startEvent, end1)
+ ));
+ }
+
+ @Test
public void testTimesRangeNonStrict() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/3fc96cd1/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index 999e5f3..6d93ff3 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -195,6 +195,16 @@ public class PatternTest extends TestLogger {
assertEquals(previous2.getName(), "start");
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testPatternTimesNegativeTimes() throws Exception {
+ Pattern.begin("start").where(dummyCondition()).times(-1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPatternTimesNegativeFrom() throws Exception {
+ Pattern.begin("start").where(dummyCondition()).times(-1, 2);
+ }
+
@Test(expected = MalformedPatternException.class)
public void testPatternCanHaveQuantifierSpecifiedOnce1() throws Exception {