You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/05/04 15:51:46 UTC
flink git commit: [FLINK-6445] [cep] Fix NPE in no-condition patterns.
Repository: flink
Updated Branches:
refs/heads/master a2ec3ee66 -> d6435e87c
[FLINK-6445] [cep] Fix NPE in no-condition patterns.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6435e87
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6435e87
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6435e87
Branch: refs/heads/master
Commit: d6435e87cd4c58dfa26c2acf10474d7eb7c46f57
Parents: a2ec3ee
Author: kl0u <kk...@gmail.com>
Authored: Thu May 4 15:33:40 2017 +0200
Committer: kl0u <kk...@gmail.com>
Committed: Thu May 4 15:58:47 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/cep/pattern/Pattern.java | 7 ++-
.../cep/pattern/conditions/AndCondition.java | 6 +-
.../cep/pattern/conditions/NotCondition.java | 2 +-
.../cep/pattern/conditions/OrCondition.java | 6 +-
.../pattern/conditions/SubtypeCondition.java | 4 +-
.../org/apache/flink/cep/nfa/NFAITCase.java | 66 ++++++++++++++++++++
6 files changed, 84 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index cef0f85..b100bc5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -127,8 +127,9 @@ public class Pattern<T, F extends T> {
* @return The pattern with the new condition is set.
*/
public Pattern<T, F> where(IterativeCondition<F> condition) {
- ClosureCleaner.clean(condition, true);
+ Preconditions.checkNotNull(condition, "The condition cannot be null.");
+ ClosureCleaner.clean(condition, true);
if (this.condition == null) {
this.condition = condition;
} else {
@@ -148,6 +149,8 @@ public class Pattern<T, F extends T> {
* @return The pattern with the new condition is set.
*/
public Pattern<T, F> or(IterativeCondition<F> condition) {
+ Preconditions.checkNotNull(condition, "The condition cannot be null.");
+
ClosureCleaner.clean(condition, true);
if (this.condition == null) {
@@ -167,6 +170,8 @@ public class Pattern<T, F extends T> {
* @return The same pattern with the new subtype constraint
*/
public <S extends F> Pattern<T, S> subtype(final Class<S> subtypeClass) {
+ Preconditions.checkNotNull(subtypeClass, "The class cannot be null.");
+
if (condition == null) {
this.condition = new SubtypeCondition<F>(subtypeClass);
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
index 5df7c66..ac34c41 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
@@ -18,6 +18,8 @@
package org.apache.flink.cep.pattern.conditions;
+import org.apache.flink.util.Preconditions;
+
/**
* A {@link IterativeCondition condition} which combines two conditions with a logical
* {@code AND} and returns {@code true} if both are {@code true}.
@@ -32,8 +34,8 @@ public class AndCondition<T> extends IterativeCondition<T> {
private final IterativeCondition<T> right;
public AndCondition(final IterativeCondition<T> left, final IterativeCondition<T> right) {
- this.left = left;
- this.right = right;
+ this.left = Preconditions.checkNotNull(left, "The condition cannot be null.");
+ this.right = Preconditions.checkNotNull(right, "The condition cannot be null.");
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
index 3e6ab56..9318c2f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
@@ -35,6 +35,6 @@ public class NotCondition<T> extends IterativeCondition<T> {
@Override
public boolean filter(T value, Context<T> ctx) throws Exception {
- return !original.filter(value, ctx);
+ return original != null && !original.filter(value, ctx);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
index 6aaa4bb..d3690ab 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
@@ -18,6 +18,8 @@
package org.apache.flink.cep.pattern.conditions;
+import org.apache.flink.util.Preconditions;
+
/**
* A {@link IterativeCondition condition} which combines two conditions with a logical
* {@code OR} and returns {@code true} if at least one is {@code true}.
@@ -32,8 +34,8 @@ public class OrCondition<T> extends IterativeCondition<T> {
private final IterativeCondition<T> right;
public OrCondition(final IterativeCondition<T> left, final IterativeCondition<T> right) {
- this.left = left;
- this.right = right;
+ this.left = Preconditions.checkNotNull(left, "The condition cannot be null.");
+ this.right = Preconditions.checkNotNull(right, "The condition cannot be null.");
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
index 91f6c21..cff8693 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
@@ -18,6 +18,8 @@
package org.apache.flink.cep.pattern.conditions;
+import org.apache.flink.util.Preconditions;
+
/**
* A {@link IterativeCondition condition} which filters elements of the given type.
* An element is filtered out iff it is not assignable to the given subtype of {@code T}.
@@ -31,7 +33,7 @@ public class SubtypeCondition<T> extends SimpleCondition<T> {
private final Class<? extends T> subtype;
public SubtypeCondition(final Class<? extends T> subtype) {
- this.subtype = subtype;
+ this.subtype = Preconditions.checkNotNull(subtype, "The subtype cannot be null.");
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 4a00c1e..fe31564 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -51,6 +51,72 @@ import static org.junit.Assert.assertEquals;
public class NFAITCase extends TestLogger {
@Test
+ public void testNoConditionNFA() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a = new Event(40, "a", 1.0);
+ Event b = new Event(41, "b", 2.0);
+ Event c = new Event(42, "c", 3.0);
+ Event d = new Event(43, "d", 4.0);
+ Event e = new Event(44, "e", 5.0);
+
+ inputEvents.add(new StreamRecord<>(a, 1));
+ inputEvents.add(new StreamRecord<>(b, 2));
+ inputEvents.add(new StreamRecord<>(c, 3));
+ inputEvents.add(new StreamRecord<>(d, 4));
+ inputEvents.add(new StreamRecord<>(e, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").followedBy("end");
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a, b),
+ Lists.newArrayList(b, c),
+ Lists.newArrayList(c, d),
+ Lists.newArrayList(d, e)
+ ));
+ }
+
+ @Test
+ public void testAnyWithNoConditionNFA() {
+ List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+ Event a = new Event(40, "a", 1.0);
+ Event b = new Event(41, "b", 2.0);
+ Event c = new Event(42, "c", 3.0);
+ Event d = new Event(43, "d", 4.0);
+ Event e = new Event(44, "e", 5.0);
+
+ inputEvents.add(new StreamRecord<>(a, 1));
+ inputEvents.add(new StreamRecord<>(b, 2));
+ inputEvents.add(new StreamRecord<>(c, 3));
+ inputEvents.add(new StreamRecord<>(d, 4));
+ inputEvents.add(new StreamRecord<>(e, 5));
+
+ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").followedByAny("end");
+
+ NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+ List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+ compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+ Lists.newArrayList(a, b),
+ Lists.newArrayList(a, c),
+ Lists.newArrayList(a, d),
+ Lists.newArrayList(a, e),
+ Lists.newArrayList(b, c),
+ Lists.newArrayList(b, d),
+ Lists.newArrayList(b, e),
+ Lists.newArrayList(c, d),
+ Lists.newArrayList(c, e),
+ Lists.newArrayList(d, e)
+ ));
+ }
+
+ @Test
public void testSimplePatternNFA() {
List<StreamRecord<Event>> inputEvents = new ArrayList<>();