You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/01 09:41:00 UTC

[jira] [Commented] (FLINK-10414) Add skip to next strategy

    [ https://issues.apache.org/jira/browse/FLINK-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633773#comment-16633773 ] 

ASF GitHub Bot commented on FLINK-10414:
----------------------------------------

dawidwys closed pull request #6756: [FLINK-10414][cep] Added skip to next strategy
URL: https://github.com/apache/flink/pull/6756
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index d7f915fc62e..c8e836349ef 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -1288,6 +1288,15 @@ For example, for a given pattern `b+ c` and a data stream `b1 b2 b3 c`, the diff
         </td>
         <td>After found matching <code>b1 b2 b3 c</code>, the match process will not discard any result.</td>
     </tr>
+    <tr>
+        <td><strong>SKIP_TO_NEXT</strong></td>
+        <td>
+            <code>b1 b2 b3 c</code><br>
+            <code>b2 b3 c</code><br>
+            <code>b3 c</code><br>
+        </td>
+        <td>After found matching <code>b1 b2 b3 c</code>, the match process will not discard any result, because no other match could start at b1.</td>
+    </tr>
     <tr>
         <td><strong>SKIP_PAST_LAST_EVENT</strong></td>
         <td>
@@ -1296,7 +1305,7 @@ For example, for a given pattern `b+ c` and a data stream `b1 b2 b3 c`, the diff
         <td>After found matching <code>b1 b2 b3 c</code>, the match process will discard all started partial matches.</td>
     </tr>
     <tr>
-        <td><strong>SKIP_TO_FIRST</strong>[<code>b*</code>]</td>
+        <td><strong>SKIP_TO_FIRST</strong>[<code>b</code>]</td>
         <td>
             <code>b1 b2 b3 c</code><br>
             <code>b2 b3 c</code><br>
@@ -1340,7 +1349,35 @@ Pattern: `(a | c) (b | c) c+.greedy d` and sequence: `a b c1 c2 c3 d` Then the r
             <code>a b c1 c2 c3 d</code><br>
             <code>c1 c2 c3 d</code><br>
         </td>
-        <td>After found matching <code>a b c1 c2 c3 d</code>, the match process will try to discard all partial matches started before <code>c1</code>. There is one such match <code>b c1 c2 c3 d</code>.</td>
+        <td>After found matching <code>a b c1 c2 c3 d</code>, the match process will discard all partial matches started before <code>c1</code>. There is one such match <code>b c1 c2 c3 d</code>.</td>
+    </tr>
+</table>
+
+To better understand the difference between NO_SKIP and SKIP_TO_NEXT take a look at following example:
+Pattern: `a b+` and sequence: `a b1 b2 b3` Then the results will be:
+
+
+<table class="table table-bordered">
+    <tr>
+        <th class="text-left" style="width: 25%">Skip Strategy</th>
+        <th class="text-center" style="width: 25%">Result</th>
+        <th class="text-center"> Description</th>
+    </tr>
+    <tr>
+        <td><strong>NO_SKIP</strong></td>
+        <td>
+            <code>a b1</code><br>
+            <code>a b1 b2</code><br>
+            <code>a b1 b2 b3</code><br>
+        </td>
+        <td>After found matching <code>a b1</code>, the match process will not discard any result.</td>
+    </tr>
+    <tr>
+        <td><strong>SKIP_TO_NEXT</strong>[<code>b*</code>]</td>
+        <td>
+            <code>a b1</code><br>
+        </td>
+        <td>After found matching <code>a b1</code>, the match process will discard all partial matches started at <code>a</code>. This means neither <code>a b1 b2</code> nor <code>a b1 b2 b3</code> could be generated.</td>
     </tr>
 </table>
 
@@ -1354,6 +1391,10 @@ To specify which skip strategy to use, just create an `AfterMatchSkipStrategy` b
         <td><code>AfterMatchSkipStrategy.noSkip()</code></td>
         <td>Create a <strong>NO_SKIP</strong> skip strategy </td>
     </tr>
+    <tr>
+        <td><code>AfterMatchSkipStrategy.skipToNext()</code></td>
+        <td>Create a <strong>SKIP_TO_NEXT</strong> skip strategy </td>
+    </tr>
     <tr>
         <td><code>AfterMatchSkipStrategy.skipPastLastEvent()</code></td>
         <td>Create a <strong>SKIP_PAST_LAST_EVENT</strong> skip strategy </td>
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
index f4448a35560..7578e29bbfa 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
@@ -38,7 +38,7 @@
 	private static final long serialVersionUID = -4048930333619068531L;
 
 	/**
-	 * Discards every partial match that contains event of the match preceding the first of *PatternName*.
+	 * Discards every partial match that started before the first event of emitted match mapped to *PatternName*.
 	 *
 	 * @param patternName the pattern name to skip to
 	 * @return the created AfterMatchSkipStrategy
@@ -48,7 +48,7 @@ public static SkipToFirstStrategy skipToFirst(String patternName) {
 	}
 
 	/**
-	 * Discards every partial match that contains event of the match preceding the last of *PatternName*.
+	 * Discards every partial match that started before the last event of emitted match mapped to *PatternName*.
 	 *
 	 * @param patternName the pattern name to skip to
 	 * @return the created AfterMatchSkipStrategy
@@ -58,7 +58,7 @@ public static SkipToLastStrategy skipToLast(String patternName) {
 	}
 
 	/**
-	 * Discards every partial match that contains event of the match.
+	 * Discards every partial match that started before emitted match ended.
 	 *
 	 * @return the created AfterMatchSkipStrategy
 	 */
@@ -66,6 +66,15 @@ public static SkipPastLastStrategy skipPastLastEvent() {
 		return SkipPastLastStrategy.INSTANCE;
 	}
 
+	/**
+	 * Discards every partial match that started with the same event, emitted match was started.
+	 *
+	 * @return the created AfterMatchSkipStrategy
+	 */
+	public static AfterMatchSkipStrategy skipToNext() {
+		return SkipToNextStrategy.INSTANCE;
+	}
+
 	/**
 	 * Every possible match will be emitted.
 	 *
@@ -149,6 +158,22 @@ static EventId max(EventId o1, EventId o2) {
 		}
 	}
 
+	static EventId min(EventId o1, EventId o2) {
+		if (o2 == null) {
+			return o1;
+		}
+
+		if (o1 == null) {
+			return o2;
+		}
+
+		if (o1.compareTo(o2) <= 0) {
+			return o1;
+		} else {
+			return o2;
+		}
+	}
+
 	/** Forbid further extending. */
 	AfterMatchSkipStrategy() {
 	}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipPastLastStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipPastLastStrategy.java
index 952d91acb92..c8441e54fa1 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipPastLastStrategy.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipPastLastStrategy.java
@@ -25,9 +25,9 @@
 import java.util.Map;
 
 /**
- * Discards every partial match that contains event of the match.
+ * Discards every partial match that started before emitted match ended.
  */
-public class SkipPastLastStrategy extends AfterMatchSkipStrategy {
+public final class SkipPastLastStrategy extends SkipRelativeToWholeMatchStrategy {
 
 	public static final SkipPastLastStrategy INSTANCE = new SkipPastLastStrategy();
 
@@ -36,16 +36,6 @@
 	private SkipPastLastStrategy() {
 	}
 
-	@Override
-	public boolean isSkipStrategy() {
-		return true;
-	}
-
-	@Override
-	protected boolean shouldPrune(EventId startEventID, EventId pruningId) {
-		return startEventID != null && startEventID.compareTo(pruningId) <= 0;
-	}
-
 	@Override
 	protected EventId getPruningId(final Collection<Map<String, List<EventId>>> match) {
 		EventId pruningId = null;
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipRelativeToWholeMatchStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipRelativeToWholeMatchStrategy.java
new file mode 100644
index 00000000000..00bce97f38f
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipRelativeToWholeMatchStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+
+abstract class SkipRelativeToWholeMatchStrategy extends AfterMatchSkipStrategy {
+	private static final long serialVersionUID = -3214720554878479037L;
+
+	@Override
+	public final boolean isSkipStrategy() {
+		return true;
+	}
+
+	@Override
+	protected final boolean shouldPrune(EventId startEventID, EventId pruningId) {
+		return startEventID != null && startEventID.compareTo(pruningId) <= 0;
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
index e8befebe5fc..6425a13e36e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToFirstStrategy.java
@@ -19,7 +19,7 @@
 package org.apache.flink.cep.nfa.aftermatch;
 
 /**
- * Discards every partial match that contains event of the match preceding the first of *PatternName*.
+ * Discards every partial match that started before the first event of emitted match mapped to *PatternName*.
  */
 public final class SkipToFirstStrategy extends SkipToElementStrategy {
 	private static final long serialVersionUID = 7127107527654629026L;
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
index 3c33bd13f62..ec8ea549d0e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToLastStrategy.java
@@ -19,7 +19,7 @@
 package org.apache.flink.cep.nfa.aftermatch;
 
 /**
- * Discards every partial match that contains event of the match preceding the last of *PatternName*.
+ * Discards every partial match that started before the last event of emitted match mapped to *PatternName*.
  */
 public final class SkipToLastStrategy extends SkipToElementStrategy {
 	private static final long serialVersionUID = 7585116990619594531L;
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToNextStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToNextStrategy.java
new file mode 100644
index 00000000000..5d7b354bd43
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToNextStrategy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Discards every partial match that started with the same event, emitted match was started.
+ */
+public final class SkipToNextStrategy extends SkipRelativeToWholeMatchStrategy {
+
+	public static final SkipToNextStrategy INSTANCE = new SkipToNextStrategy();
+
+	private static final long serialVersionUID = -6490314998588752621L;
+
+	private SkipToNextStrategy() {
+	}
+
+	@Override
+	protected EventId getPruningId(final Collection<Map<String, List<EventId>>> match) {
+		EventId pruningId = null;
+		for (Map<String, List<EventId>> resultMap : match) {
+			for (List<EventId> eventList : resultMap.values()) {
+				pruningId = min(pruningId, eventList.get(0));
+			}
+		}
+
+		return pruningId;
+	}
+
+	@Override
+	public String toString() {
+		return "SkipToNextStrategy{}";
+	}
+}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
index 6681090d8b2..d42839e3071 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
@@ -45,7 +45,7 @@
 public class AfterMatchSkipITCase extends TestLogger{
 
 	@Test
-	public void testSkipToNext() throws Exception {
+	public void testNoSkip() throws Exception {
 		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
 
 		Event a1 = new Event(1, "a", 0.0);
@@ -83,6 +83,121 @@ public boolean filter(Event value) throws Exception {
 		));
 	}
 
+	@Test
+	public void testNoSkipWithFollowedByAny() throws Exception {
+		List<List<Event>> resultingPatterns = TwoVariablesFollowedByAny.compute(AfterMatchSkipStrategy.noSkip());
+
+		compareMaps(resultingPatterns, Lists.newArrayList(
+			Lists.newArrayList(TwoVariablesFollowedByAny.a1, TwoVariablesFollowedByAny.b1),
+			Lists.newArrayList(TwoVariablesFollowedByAny.a1, TwoVariablesFollowedByAny.b2),
+			Lists.newArrayList(TwoVariablesFollowedByAny.a2, TwoVariablesFollowedByAny.b2)
+		));
+	}
+
+	@Test
+	public void testSkipToNextWithFollowedByAny() throws Exception {
+		List<List<Event>> resultingPatterns = TwoVariablesFollowedByAny.compute(AfterMatchSkipStrategy.skipToNext());
+
+		compareMaps(resultingPatterns, Lists.newArrayList(
+			Lists.newArrayList(TwoVariablesFollowedByAny.a1, TwoVariablesFollowedByAny.b1),
+			Lists.newArrayList(TwoVariablesFollowedByAny.a2, TwoVariablesFollowedByAny.b2)
+		));
+	}
+
+	static class TwoVariablesFollowedByAny {
+
+		static Event a1 = new Event(1, "a", 0.0);
+		static Event b1 = new Event(2, "b", 0.0);
+		static Event a2 = new Event(4, "a", 0.0);
+		static Event b2 = new Event(5, "b", 0.0);
+
+		private static List<List<Event>> compute(AfterMatchSkipStrategy skipStrategy) throws Exception {
+			List<StreamRecord<Event>> streamEvents = new ArrayList<>();
+
+			streamEvents.add(new StreamRecord<>(a1));
+			streamEvents.add(new StreamRecord<>(b1));
+			streamEvents.add(new StreamRecord<>(a2));
+			streamEvents.add(new StreamRecord<>(b2));
+
+			Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
+				.where(new SimpleCondition<Event>() {
+
+					@Override
+					public boolean filter(Event value) throws Exception {
+						return value.getName().equals("a");
+					}
+				}).followedByAny("end")
+				.where(new SimpleCondition<Event>() {
+
+					@Override
+					public boolean filter(Event value) throws Exception {
+						return value.getName().equals("b");
+					}
+				});
+
+			NFA<Event> nfa = compile(pattern, false);
+
+			return feedNFA(streamEvents, nfa, skipStrategy);
+		}
+	}
+
+	@Test
+	public void testNoSkipWithQuantifierAtTheEnd() throws Exception {
+		List<List<Event>> resultingPatterns = QuantifierAtEndOfPattern.compute(AfterMatchSkipStrategy.noSkip());
+
+		compareMaps(resultingPatterns, Lists.newArrayList(
+			Lists.newArrayList(QuantifierAtEndOfPattern.a1, QuantifierAtEndOfPattern.b1,  QuantifierAtEndOfPattern.b2,  QuantifierAtEndOfPattern.b3),
+			Lists.newArrayList(QuantifierAtEndOfPattern.a1, QuantifierAtEndOfPattern.b1,  QuantifierAtEndOfPattern.b2),
+			Lists.newArrayList(QuantifierAtEndOfPattern.a1, QuantifierAtEndOfPattern.b1)
+		));
+	}
+
+	@Test
+	public void testSkipToNextWithQuantifierAtTheEnd() throws Exception {
+		List<List<Event>> resultingPatterns = QuantifierAtEndOfPattern.compute(AfterMatchSkipStrategy.skipToNext());
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(QuantifierAtEndOfPattern.a1, QuantifierAtEndOfPattern.b1)
+		));
+	}
+
+	static class QuantifierAtEndOfPattern {
+
+		static Event a1 = new Event(1, "a", 0.0);
+		static Event b1 = new Event(2, "b", 0.0);
+		static Event b2 = new Event(4, "b", 0.0);
+		static Event b3 = new Event(5, "b", 0.0);
+
+		private static List<List<Event>> compute(AfterMatchSkipStrategy skipStrategy) throws Exception {
+			List<StreamRecord<Event>> streamEvents = new ArrayList<>();
+
+			streamEvents.add(new StreamRecord<>(a1));
+			streamEvents.add(new StreamRecord<>(b1));
+			streamEvents.add(new StreamRecord<>(b2));
+			streamEvents.add(new StreamRecord<>(b3));
+
+			Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
+				.where(new SimpleCondition<Event>() {
+
+					@Override
+					public boolean filter(Event value) throws Exception {
+						return value.getName().equals("a");
+					}
+				}).next("end")
+				.where(new SimpleCondition<Event>() {
+
+					@Override
+					public boolean filter(Event value) throws Exception {
+						return value.getName().equals("b");
+					}
+				}).oneOrMore();
+
+			NFA<Event> nfa = compile(pattern, false);
+
+			return feedNFA(streamEvents, nfa, skipStrategy);
+		}
+	}
+
 	@Test
 	public void testSkipPastLast() throws Exception {
 		List<StreamRecord<Event>> streamEvents = new ArrayList<>();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add skip to next strategy
> -------------------------
>
>                 Key: FLINK-10414
>                 URL: https://issues.apache.org/jira/browse/FLINK-10414
>             Project: Flink
>          Issue Type: Improvement
>          Components: CEP
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>
> Add skip to next strategy, that should discard all partial matches that started with the same element as found match.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)