You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/02/28 10:26:47 UTC

flink git commit: [FLINK-5871] [cep] Enforce uniqueness of pattern names in CEP.

Repository: flink
Updated Branches:
  refs/heads/master 1a062b796 -> a92d78746


[FLINK-5871] [cep] Enforce uniqueness of pattern names in CEP.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a92d7874
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a92d7874
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a92d7874

Branch: refs/heads/master
Commit: a92d78746318e9bbe949d9d030fc21b5348b56e2
Parents: 1a062b7
Author: kl0u <kk...@gmail.com>
Authored: Tue Feb 21 13:51:11 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Tue Feb 28 09:35:49 2017 +0100

----------------------------------------------------------------------
 .../nfa/compiler/MalformedPatternException.java | 32 ++++++++++++++++++
 .../flink/cep/nfa/compiler/NFACompiler.java     | 10 ++++++
 .../org/apache/flink/cep/pattern/Pattern.java   |  2 +-
 .../flink/cep/nfa/compiler/NFACompilerTest.java | 34 ++++++++++++++++++++
 4 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a92d7874/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
new file mode 100644
index 0000000..a3bb5f4
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.compiler;
+
+/**
+ * An exception used to indicate that a {@link org.apache.flink.cep.pattern.Pattern}
+ * was not specified correctly.
+ */
+public class MalformedPatternException extends RuntimeException {
+
+	private static final long serialVersionUID = 7751134834983361543L;
+
+	public MalformedPatternException(String message) {
+		super(message);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a92d7874/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 878e0b2..18ed21f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -34,6 +34,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a
@@ -84,12 +85,16 @@ public class NFACompiler {
 			Map<String, State<T>> states = new HashMap<>();
 			long windowTime;
 
+			// this is used to enforse pattern name uniqueness.
+			Set<String> patternNames = new HashSet<>();
+
 			Pattern<T, ?> succeedingPattern;
 			State<T> succeedingState;
 			Pattern<T, ?> currentPattern = pattern;
 
 			// we're traversing the pattern from the end to the beginning --> the first state is the final state
 			State<T> currentState = new State<>(currentPattern.getName(), State.StateType.Final);
+			patternNames.add(currentPattern.getName());
 
 			states.put(currentPattern.getName(), currentState);
 
@@ -100,6 +105,11 @@ public class NFACompiler {
 				succeedingState = currentState;
 				currentPattern = currentPattern.getPrevious();
 
+				if (!patternNames.add(currentPattern.getName())) {
+					throw new MalformedPatternException("Duplicate pattern name: " + currentPattern.getName() + ". " +
+						"Pattern names must be unique.");
+				}
+
 				Time currentWindowTime = currentPattern.getWindowTime();
 
 				if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a92d7874/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 9269dcb..7ea675f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -93,7 +93,7 @@ public class Pattern<T, F extends T> {
 	}
 
 	/**
-	 * Specifies a filter condition which is ORed with an existing filter function.
+	 * Specifies a filter condition which is OR'ed with an existing filter function.
 	 *
 	 * @param orFilterFunction OR filter condition
 	 * @return The same pattern operator where the new filter condition is set

http://git-wip-us.apache.org/repos/asf/flink/blob/a92d7874/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index c790c35..d11f3a8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -30,7 +30,9 @@ import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.StateTransitionAction;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.util.Collection;
 import java.util.HashMap;
@@ -42,6 +44,38 @@ import static org.junit.Assert.assertEquals;
 
 public class NFACompilerTest extends TestLogger {
 
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
+	@Test
+	public void testNFACompilerUniquePatternName() {
+
+		// adjust the rule
+		expectedException.expect(MalformedPatternException.class);
+		expectedException.expectMessage("Duplicate pattern name: start. Pattern names must be unique.");
+
+		Pattern<Event, ?> invalidPattern = Pattern.<Event>begin("start").where(new TestFilter())
+			.followedBy("middle").where(new TestFilter())
+			.followedBy("start").where(new TestFilter());
+
+		// here we must have an exception because of the two "start" patterns with the same name.
+		NFACompiler.compile(invalidPattern, Event.createTypeSerializer(), false);
+	}
+
+	/**
+	 * A filter implementation to test invalid pattern specification with
+	 * duplicate pattern names. Check {@link #testNFACompilerUniquePatternName()}.
+	 */
+	private static class TestFilter implements FilterFunction<Event> {
+
+		private static final long serialVersionUID = -3863103355752267133L;
+
+		@Override
+		public boolean filter(Event value) throws Exception {
+			throw new RuntimeException("It should never arrive here.");
+		}
+	}
+
 	/**
 	 * Tests that the NFACompiler generates the correct NFA from a given Pattern
 	 */