You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/02/28 10:26:47 UTC
flink git commit: [FLINK-5871] [cep] Enforce uniqueness of pattern
names in CEP.
Repository: flink
Updated Branches:
refs/heads/master 1a062b796 -> a92d78746
[FLINK-5871] [cep] Enforce uniqueness of pattern names in CEP.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a92d7874
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a92d7874
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a92d7874
Branch: refs/heads/master
Commit: a92d78746318e9bbe949d9d030fc21b5348b56e2
Parents: 1a062b7
Author: kl0u <kk...@gmail.com>
Authored: Tue Feb 21 13:51:11 2017 +0100
Committer: kl0u <kk...@gmail.com>
Committed: Tue Feb 28 09:35:49 2017 +0100
----------------------------------------------------------------------
.../nfa/compiler/MalformedPatternException.java | 32 ++++++++++++++++++
.../flink/cep/nfa/compiler/NFACompiler.java | 10 ++++++
.../org/apache/flink/cep/pattern/Pattern.java | 2 +-
.../flink/cep/nfa/compiler/NFACompilerTest.java | 34 ++++++++++++++++++++
4 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a92d7874/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
new file mode 100644
index 0000000..a3bb5f4
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/MalformedPatternException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.compiler;
+
+/**
+ * An exception used to indicate that a {@link org.apache.flink.cep.pattern.Pattern}
+ * was not specified correctly.
+ */
+public class MalformedPatternException extends RuntimeException {
+
+ private static final long serialVersionUID = 7751134834983361543L;
+
+ public MalformedPatternException(String message) {
+ super(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/a92d7874/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 878e0b2..18ed21f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -34,6 +34,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
/**
* Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a
@@ -84,12 +85,16 @@ public class NFACompiler {
Map<String, State<T>> states = new HashMap<>();
long windowTime;
+ // this is used to enforse pattern name uniqueness.
+ Set<String> patternNames = new HashSet<>();
+
Pattern<T, ?> succeedingPattern;
State<T> succeedingState;
Pattern<T, ?> currentPattern = pattern;
// we're traversing the pattern from the end to the beginning --> the first state is the final state
State<T> currentState = new State<>(currentPattern.getName(), State.StateType.Final);
+ patternNames.add(currentPattern.getName());
states.put(currentPattern.getName(), currentState);
@@ -100,6 +105,11 @@ public class NFACompiler {
succeedingState = currentState;
currentPattern = currentPattern.getPrevious();
+ if (!patternNames.add(currentPattern.getName())) {
+ throw new MalformedPatternException("Duplicate pattern name: " + currentPattern.getName() + ". " +
+ "Pattern names must be unique.");
+ }
+
Time currentWindowTime = currentPattern.getWindowTime();
if (currentWindowTime != null && currentWindowTime.toMilliseconds() < windowTime) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a92d7874/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 9269dcb..7ea675f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -93,7 +93,7 @@ public class Pattern<T, F extends T> {
}
/**
- * Specifies a filter condition which is ORed with an existing filter function.
+ * Specifies a filter condition which is OR'ed with an existing filter function.
*
* @param orFilterFunction OR filter condition
* @return The same pattern operator where the new filter condition is set
http://git-wip-us.apache.org/repos/asf/flink/blob/a92d7874/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index c790c35..d11f3a8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -30,7 +30,9 @@ import org.apache.flink.cep.nfa.StateTransition;
import org.apache.flink.cep.nfa.StateTransitionAction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.util.TestLogger;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.util.Collection;
import java.util.HashMap;
@@ -42,6 +44,38 @@ import static org.junit.Assert.assertEquals;
public class NFACompilerTest extends TestLogger {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testNFACompilerUniquePatternName() {
+
+ // adjust the rule
+ expectedException.expect(MalformedPatternException.class);
+ expectedException.expectMessage("Duplicate pattern name: start. Pattern names must be unique.");
+
+ Pattern<Event, ?> invalidPattern = Pattern.<Event>begin("start").where(new TestFilter())
+ .followedBy("middle").where(new TestFilter())
+ .followedBy("start").where(new TestFilter());
+
+ // here we must have an exception because of the two "start" patterns with the same name.
+ NFACompiler.compile(invalidPattern, Event.createTypeSerializer(), false);
+ }
+
+ /**
+ * A filter implementation to test invalid pattern specification with
+ * duplicate pattern names. Check {@link #testNFACompilerUniquePatternName()}.
+ */
+ private static class TestFilter implements FilterFunction<Event> {
+
+ private static final long serialVersionUID = -3863103355752267133L;
+
+ @Override
+ public boolean filter(Event value) throws Exception {
+ throw new RuntimeException("It should never arrive here.");
+ }
+ }
+
/**
* Tests that the NFACompiler generates the correct NFA from a given Pattern
*/