You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/10/01 07:46:39 UTC

[GitHub] dawidwys closed pull request #6758: [FLINK-10417][cep] Added option to throw exception on pattern variable mis…

dawidwys closed pull request #6758: [FLINK-10417][cep] Added option to throw exception on pattern variable mis…
URL: https://github.com/apache/flink/pull/6758
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index ad321bf71b5..d7f915fc62e 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -1385,6 +1385,23 @@ Pattern.begin("patternName", skipStrategy)
 </div>
 </div>
 
+{% warn Attention %} For SKIP_TO_FIRST/LAST there are two options how to handle cases when there are no elements mapped to
+the specified variable. By default a NO_SKIP strategy will be used in this case. The other option is to throw exception in such situation.
+One can enable this option by:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()
+{% endhighlight %}
+</div>
+</div>
+
 ## Detecting Patterns
 
 After specifying the pattern sequence you are looking for, it is time to apply it to your input stream to detect
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
index 8151a124af4..f4448a35560 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
@@ -43,8 +43,8 @@
 	 * @param patternName the pattern name to skip to
 	 * @return the created AfterMatchSkipStrategy
 	 */
-	public static AfterMatchSkipStrategy skipToFirst(String patternName) {
-		return new SkipToFirstStrategy(patternName);
+	public static SkipToFirstStrategy skipToFirst(String patternName) {
+		return new SkipToFirstStrategy(patternName, false);
 	}
 
 	/**
@@ -53,8 +53,8 @@ public static AfterMatchSkipStrategy skipToFirst(String patternName) {
 	 * @param patternName the pattern name to skip to
 	 * @return the created AfterMatchSkipStrategy
 	 */
-	public static AfterMatchSkipStrategy skipToLast(String patternName) {
-		return new SkipToLastStrategy(patternName);
+	public static SkipToLastStrategy skipToLast(String patternName) {
+		return new SkipToLastStrategy(patternName, false);
 	}
 
 	/**
@@ -62,7 +62,7 @@ public static AfterMatchSkipStrategy skipToLast(String patternName) {
 	 *
 	 * @return the created AfterMatchSkipStrategy
 	 */
-	public static AfterMatchSkipStrategy skipPastLastEvent() {
+	public static SkipPastLastStrategy skipPastLastEvent() {
 		return SkipPastLastStrategy.INSTANCE;
 	}
 
@@ -71,7 +71,7 @@ public static AfterMatchSkipStrategy skipPastLastEvent() {
 	 *
 	 * @return the created AfterMatchSkipStrategy
 	 */
-	public static AfterMatchSkipStrategy noSkip() {
+	public static NoSkipStrategy noSkip() {
 		return NoSkipStrategy.INSTANCE;
 	}
 
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java
new file mode 100644
index 00000000000..5554151ccbd
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+abstract class SkipToElementStrategy extends AfterMatchSkipStrategy {
+	private static final long serialVersionUID = 7127107527654629026L;
+	private final String patternName;
+	private final boolean shouldThrowException;
+
+	SkipToElementStrategy(String patternName, boolean shouldThrowException) {
+		this.patternName = checkNotNull(patternName);
+		this.shouldThrowException = shouldThrowException;
+	}
+
+	@Override
+	public boolean isSkipStrategy() {
+		return true;
+	}
+
+	@Override
+	protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
+		return startEventID != null && startEventID.compareTo(pruningId) < 0;
+	}
+
+	@Override
+	protected EventId getPruningId(Collection<Map<String, List<EventId>>> match) {
+		EventId pruningId = null;
+		for (Map<String, List<EventId>> resultMap : match) {
+			List<EventId> pruningPattern = resultMap.get(patternName);
+			if (pruningPattern == null || pruningPattern.isEmpty()) {
+				if (shouldThrowException) {
+					throw new FlinkRuntimeException(String.format(
+						"Could not skip to %s. No such element in the found match %s",
+						patternName,
+						resultMap));
+				}
+			} else {
+				pruningId = max(pruningId, pruningPattern.get(getIndex(pruningPattern.size())));
+			}
+		}
+
+		return pruningId;
+	}
+
+	@Override
+	public Optional<String> getPatternName() {
+		return Optional.of(patternName);
+	}
+
+	/**
+	 * Tells which element from the list of events mapped to *PatternName* to use.
+	 *
+	 * @param size number of elements mapped to the *PatternName*
+	 * @return index of event mapped to *PatternName* to use for pruning
+	 */
+	abstract int getIndex(int size);
+
+	/**
+	 * Enables throwing exception if no events mapped to the *PatternName*. If not enabled and no events were mapped,
+	 * {@link NoSkipStrategy} will be used
+	 */
+	public abstract SkipToElementStrategy throwExceptionOnMiss();
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
index 7d7be4a4b1e..e8befebe5fc 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
@@ -18,59 +18,30 @@
 
 package org.apache.flink.cep.nfa.aftermatch;
 
-import org.apache.flink.cep.nfa.sharedbuffer.EventId;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Discards every partial match that contains event of the match preceding the first of *PatternName*.
  */
-public class SkipToFirstStrategy extends AfterMatchSkipStrategy {
-
+public final class SkipToFirstStrategy extends SkipToElementStrategy {
 	private static final long serialVersionUID = 7127107527654629026L;
-	private final String patternName;
 
-	SkipToFirstStrategy(String patternName) {
-		this.patternName = checkNotNull(patternName);
+	SkipToFirstStrategy(String patternName, boolean shouldThrowException) {
+		super(patternName, shouldThrowException);
 	}
 
 	@Override
-	public boolean isSkipStrategy() {
-		return true;
-	}
-
-	@Override
-	protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
-		return startEventID != null && startEventID.compareTo(pruningId) < 0;
-	}
-
-	@Override
-	protected EventId getPruningId(Collection<Map<String, List<EventId>>> match) {
-		EventId pruniningId = null;
-		for (Map<String, List<EventId>> resultMap : match) {
-			List<EventId> pruningPattern = resultMap.get(patternName);
-			if (pruningPattern != null && !pruningPattern.isEmpty()) {
-				pruniningId = max(pruniningId, pruningPattern.get(0));
-			}
-		}
-
-		return pruniningId;
+	public SkipToElementStrategy throwExceptionOnMiss() {
+		return new SkipToFirstStrategy(getPatternName().get(), true);
 	}
 
 	@Override
-	public Optional<String> getPatternName() {
-		return Optional.of(patternName);
+	int getIndex(int size) {
+		return 0;
 	}
 
 	@Override
 	public String toString() {
 		return "SkipToFirstStrategy{" +
-			"patternName='" + patternName + '\'' +
+			"patternName='" + getPatternName().get() + '\'' +
 			'}';
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
index 0f6c3eddcfb..3c33bd13f62 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
@@ -18,59 +18,30 @@
 
 package org.apache.flink.cep.nfa.aftermatch;
 
-import org.apache.flink.cep.nfa.sharedbuffer.EventId;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * Discards every partial match that contains event of the match preceding the last of *PatternName*.
  */
-public class SkipToLastStrategy extends AfterMatchSkipStrategy {
+public final class SkipToLastStrategy extends SkipToElementStrategy {
 	private static final long serialVersionUID = 7585116990619594531L;
-	private final String patternName;
 
-	SkipToLastStrategy(String patternName) {
-		this.patternName = checkNotNull(patternName);
-	}
-
-	@Override
-	public boolean isSkipStrategy() {
-		return true;
+	SkipToLastStrategy(String patternName, boolean shouldThrowException) {
+		super(patternName, shouldThrowException);
 	}
 
 	@Override
-	protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
-		return startEventID != null && startEventID.compareTo(pruningId) < 0;
-	}
-
-	@Override
-	protected EventId getPruningId(Collection<Map<String, List<EventId>>> match) {
-		EventId pruningId = null;
-		for (Map<String, List<EventId>> resultMap : match) {
-			List<EventId> pruningPattern = resultMap.get(patternName);
-
-			if (pruningPattern != null && !pruningPattern.isEmpty()) {
-				pruningId = max(pruningId, pruningPattern.get(pruningPattern.size() - 1));
-			}
-		}
-
-		return pruningId;
+	public SkipToElementStrategy throwExceptionOnMiss() {
+		return new SkipToLastStrategy(getPatternName().get(), true);
 	}
 
 	@Override
-	public Optional<String> getPatternName() {
-		return Optional.of(patternName);
+	int getIndex(int size) {
+		return size - 1;
 	}
 
 	@Override
 	public String toString() {
 		return "SkipToLastStrategy{" +
-			"patternName='" + patternName + '\'' +
+			"patternName='" + getPatternName().get() + '\'' +
 			'}';
 	}
 }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
index 4462d103164..6681090d8b2 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
@@ -24,6 +24,7 @@
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -432,6 +433,74 @@ public boolean filter(Event value) throws Exception {
 		));
 	}
 
+	@Test(expected = FlinkRuntimeException.class)
+	public void testSkipToFirstNonExistentPosition() throws Exception {
+		MissedSkipTo.compute(AfterMatchSkipStrategy.skipToFirst("b").throwExceptionOnMiss());
+
+		//exception should be thrown
+	}
+
+	@Test
+	public void testSkipToFirstNonExistentPositionWithoutException() throws Exception {
+		List<List<Event>> resultingPatterns = MissedSkipTo.compute(AfterMatchSkipStrategy.skipToFirst("b"));
+
+		compareMaps(resultingPatterns, Collections.singletonList(
+			Lists.newArrayList(MissedSkipTo.a, MissedSkipTo.c)
+		));
+	}
+
+	@Test(expected = FlinkRuntimeException.class)
+	public void testSkipToLastNonExistentPosition() throws Exception {
+		MissedSkipTo.compute(AfterMatchSkipStrategy.skipToLast("b").throwExceptionOnMiss());
+
+		//exception should be thrown
+	}
+
+	@Test
+	public void testSkipToLastNonExistentPositionWithoutException() throws Exception {
+		List<List<Event>> resultingPatterns = MissedSkipTo.compute(AfterMatchSkipStrategy.skipToFirst("b"));
+
+		compareMaps(resultingPatterns, Collections.singletonList(
+			Lists.newArrayList(MissedSkipTo.a, MissedSkipTo.c)
+		));
+	}
+
+	static class MissedSkipTo {
+		static Event a = new Event(1, "a", 0.0);
+		static Event c = new Event(4, "c", 0.0);
+
+		static List<List<Event>> compute(AfterMatchSkipStrategy skipStrategy) throws Exception {
+			List<StreamRecord<Event>> streamEvents = new ArrayList<>();
+
+			streamEvents.add(new StreamRecord<>(a));
+			streamEvents.add(new StreamRecord<>(c));
+
+			Pattern<Event, ?> pattern = Pattern.<Event>begin("a").where(
+				new SimpleCondition<Event>() {
+
+					@Override
+					public boolean filter(Event value) throws Exception {
+						return value.getName().contains("a");
+					}
+				}
+			).next("b").where(new SimpleCondition<Event>() {
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().contains("b");
+				}
+			}).oneOrMore().optional().consecutive()
+				.next("c").where(new SimpleCondition<Event>() {
+					@Override
+					public boolean filter(Event value) throws Exception {
+						return value.getName().contains("c");
+					}
+				});
+			NFA<Event> nfa = compile(pattern, false);
+
+			return feedNFA(streamEvents, nfa, skipStrategy);
+		}
+	}
+
 	@Test
 	public void testSkipToLastWithOneOrMore() throws Exception {
 		List<StreamRecord<Event>> streamEvents = new ArrayList<>();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services