You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/05/04 15:51:46 UTC

flink git commit: [FLINK-6445] [cep] Fix NPE in no-condition patterns.

Repository: flink
Updated Branches:
  refs/heads/master a2ec3ee66 -> d6435e87c


[FLINK-6445] [cep] Fix NPE in no-condition patterns.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d6435e87
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d6435e87
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d6435e87

Branch: refs/heads/master
Commit: d6435e87cd4c58dfa26c2acf10474d7eb7c46f57
Parents: a2ec3ee
Author: kl0u <kk...@gmail.com>
Authored: Thu May 4 15:33:40 2017 +0200
Committer: kl0u <kk...@gmail.com>
Committed: Thu May 4 15:58:47 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/cep/pattern/Pattern.java   |  7 ++-
 .../cep/pattern/conditions/AndCondition.java    |  6 +-
 .../cep/pattern/conditions/NotCondition.java    |  2 +-
 .../cep/pattern/conditions/OrCondition.java     |  6 +-
 .../pattern/conditions/SubtypeCondition.java    |  4 +-
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 66 ++++++++++++++++++++
 6 files changed, 84 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index cef0f85..b100bc5 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -127,8 +127,9 @@ public class Pattern<T, F extends T> {
 	 * @return The pattern with the new condition is set.
 	 */
 	public Pattern<T, F> where(IterativeCondition<F> condition) {
-		ClosureCleaner.clean(condition, true);
+		Preconditions.checkNotNull(condition, "The condition cannot be null.");
 
+		ClosureCleaner.clean(condition, true);
 		if (this.condition == null) {
 			this.condition = condition;
 		} else {
@@ -148,6 +149,8 @@ public class Pattern<T, F extends T> {
 	 * @return The pattern with the new condition is set.
 	 */
 	public Pattern<T, F> or(IterativeCondition<F> condition) {
+		Preconditions.checkNotNull(condition, "The condition cannot be null.");
+
 		ClosureCleaner.clean(condition, true);
 
 		if (this.condition == null) {
@@ -167,6 +170,8 @@ public class Pattern<T, F extends T> {
 	 * @return The same pattern with the new subtype constraint
 	 */
 	public <S extends F> Pattern<T, S> subtype(final Class<S> subtypeClass) {
+		Preconditions.checkNotNull(subtypeClass, "The class cannot be null.");
+
 		if (condition == null) {
 			this.condition = new SubtypeCondition<F>(subtypeClass);
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
index 5df7c66..ac34c41 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/AndCondition.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.pattern.conditions;
 
+import org.apache.flink.util.Preconditions;
+
 /**
  * A {@link IterativeCondition condition} which combines two conditions with a logical
  * {@code AND} and returns {@code true} if both are {@code true}.
@@ -32,8 +34,8 @@ public class AndCondition<T> extends IterativeCondition<T> {
 	private final IterativeCondition<T> right;
 
 	public AndCondition(final IterativeCondition<T> left, final IterativeCondition<T> right) {
-		this.left = left;
-		this.right = right;
+		this.left = Preconditions.checkNotNull(left, "The condition cannot be null.");
+		this.right = Preconditions.checkNotNull(right, "The condition cannot be null.");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
index 3e6ab56..9318c2f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/NotCondition.java
@@ -35,6 +35,6 @@ public class NotCondition<T> extends IterativeCondition<T> {
 
 	@Override
 	public boolean filter(T value, Context<T> ctx) throws Exception {
-		return !original.filter(value, ctx);
+		return original != null && !original.filter(value, ctx);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
index 6aaa4bb..d3690ab 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/OrCondition.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.pattern.conditions;
 
+import org.apache.flink.util.Preconditions;
+
 /**
  * A {@link IterativeCondition condition} which combines two conditions with a logical
  * {@code OR} and returns {@code true} if at least one is {@code true}.
@@ -32,8 +34,8 @@ public class OrCondition<T> extends IterativeCondition<T> {
 	private final IterativeCondition<T> right;
 
 	public OrCondition(final IterativeCondition<T> left, final IterativeCondition<T> right) {
-		this.left = left;
-		this.right = right;
+		this.left = Preconditions.checkNotNull(left, "The condition cannot be null.");
+		this.right = Preconditions.checkNotNull(right, "The condition cannot be null.");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
index 91f6c21..cff8693 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/SubtypeCondition.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.cep.pattern.conditions;
 
+import org.apache.flink.util.Preconditions;
+
 /**
  * A {@link IterativeCondition condition} which filters elements of the given type.
  * An element is filtered out iff it is not assignable to the given subtype of {@code T}.
@@ -31,7 +33,7 @@ public class SubtypeCondition<T> extends SimpleCondition<T> {
 	private final Class<? extends T> subtype;
 
 	public SubtypeCondition(final Class<? extends T> subtype) {
-		this.subtype = subtype;
+		this.subtype = Preconditions.checkNotNull(subtype, "The subtype cannot be null.");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d6435e87/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 4a00c1e..fe31564 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -51,6 +51,72 @@ import static org.junit.Assert.assertEquals;
 public class NFAITCase extends TestLogger {
 
 	@Test
+	public void testNoConditionNFA() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a = new Event(40, "a", 1.0);
+		Event b = new Event(41, "b", 2.0);
+		Event c = new Event(42, "c", 3.0);
+		Event d = new Event(43, "d", 4.0);
+		Event e = new Event(44, "e", 5.0);
+
+		inputEvents.add(new StreamRecord<>(a, 1));
+		inputEvents.add(new StreamRecord<>(b, 2));
+		inputEvents.add(new StreamRecord<>(c, 3));
+		inputEvents.add(new StreamRecord<>(d, 4));
+		inputEvents.add(new StreamRecord<>(e, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").followedBy("end");
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(a, b),
+				Lists.newArrayList(b, c),
+				Lists.newArrayList(c, d),
+				Lists.newArrayList(d, e)
+		));
+	}
+
+	@Test
+	public void testAnyWithNoConditionNFA() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a = new Event(40, "a", 1.0);
+		Event b = new Event(41, "b", 2.0);
+		Event c = new Event(42, "c", 3.0);
+		Event d = new Event(43, "d", 4.0);
+		Event e = new Event(44, "e", 5.0);
+
+		inputEvents.add(new StreamRecord<>(a, 1));
+		inputEvents.add(new StreamRecord<>(b, 2));
+		inputEvents.add(new StreamRecord<>(c, 3));
+		inputEvents.add(new StreamRecord<>(d, 4));
+		inputEvents.add(new StreamRecord<>(e, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").followedByAny("end");
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(a, b),
+				Lists.newArrayList(a, c),
+				Lists.newArrayList(a, d),
+				Lists.newArrayList(a, e),
+				Lists.newArrayList(b, c),
+				Lists.newArrayList(b, d),
+				Lists.newArrayList(b, e),
+				Lists.newArrayList(c, d),
+				Lists.newArrayList(c, e),
+				Lists.newArrayList(d, e)
+		));
+	}
+
+	@Test
 	public void testSimplePatternNFA() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();