You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/12/03 17:16:35 UTC

[flink] branch master updated (59ebdb2 -> 63a6a0e)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 59ebdb2  [FLINK-10874][kafka-docs] Document likely cause of UnknownTopicOrPartitionException
     new d73b01b  [FLINK-7603] [table] Support WITHIN clause in MATCH_RECOGNIZE
     new 63a6a0e  [FLINK-7603] [docs] Update documentation with WITHIN clause for MATCH_RECOGNIZE

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/table/sql.md                              |  1 +
 docs/dev/table/streaming/match_recognize.md        | 76 ++++++++++++++++++++--
 .../plan/nodes/datastream/DataStreamMatch.scala    | 41 ++++++++----
 .../flink/table/match/PatternTranslatorTest.scala  | 59 +++++++++++++++++
 .../table/match/PatternTranslatorTestBase.scala    | 17 +++--
 5 files changed, 169 insertions(+), 25 deletions(-)


[flink] 02/02: [FLINK-7603] [docs] Update documentation with WITHIN clause for MATCH_RECOGNIZE

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 63a6a0e1411e25be50a48302422c0bde0dd3ac78
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Nov 28 14:29:14 2018 +0100

    [FLINK-7603] [docs] Update documentation with WITHIN clause for MATCH_RECOGNIZE
    
    This closes #7187.
---
 docs/dev/table/sql.md                       |  1 +
 docs/dev/table/streaming/match_recognize.md | 76 ++++++++++++++++++++++++++---
 2 files changed, 71 insertions(+), 6 deletions(-)

diff --git a/docs/dev/table/sql.md b/docs/dev/table/sql.md
index 90e2006..55c144a 100644
--- a/docs/dev/table/sql.md
+++ b/docs/dev/table/sql.md
@@ -211,6 +211,7 @@ matchRecognize:
             | SKIP TO variable )
       ]
       PATTERN '(' pattern ')'
+      [ WITHIN intervalLiteral ]
       DEFINE variable AS condition [, variable AS condition ]*
       ')'
 
diff --git a/docs/dev/table/streaming/match_recognize.md b/docs/dev/table/streaming/match_recognize.md
index e61d384..01799cc 100644
--- a/docs/dev/table/streaming/match_recognize.md
+++ b/docs/dev/table/streaming/match_recognize.md
@@ -93,7 +93,7 @@ Every `MATCH_RECOGNIZE` query consists of the following clauses:
 * [MEASURES](#define--measures) - defines output of the clause; similar to a `SELECT` clause.
 * [ONE ROW PER MATCH](#output-mode) - output mode which defines how many rows per match should be produced.
 * [AFTER MATCH SKIP](#after-match-strategy) - specifies where the next match should start; this is also a way to control how many distinct matches a single event can belong to.
-* [PATTERN](#defining-pattern) - allows constructing patterns that will be searched for using a _regular expression_-like syntax.
+* [PATTERN](#defining-a-pattern) - allows constructing patterns that will be searched for using a _regular expression_-like syntax.
 * [DEFINE](#define--measures) - this section defines the conditions that the pattern variables must satisfy.
 
 <span class="label label-danger">Attention</span> Currently, the `MATCH_RECOGNIZE` clause can only be applied to an [append table](dynamic_tables.html#update-and-append-queries). Furthermore, it always produces
@@ -206,7 +206,7 @@ The `DEFINE` and `MEASURES` keywords have similar meanings to the `WHERE` and `S
 The `MEASURES` clause defines what will be included in the output of a matching pattern. It can project columns and define expressions for evaluation.
 The number of produced rows depends on the [output mode](#output-mode) setting.
 
-The `DEFINE` clause specifies conditions that rows have to fulfill in order to be classified to a corresponding [pattern variable](#defining-pattern).
+The `DEFINE` clause specifies conditions that rows have to fulfill in order to be classified to a corresponding [pattern variable](#defining-a-pattern).
 If a condition is not defined for a pattern variable, a default condition will be used which evaluates to `true` for every row.
 
 For a more detailed explanation about expressions that can be used in those clauses, please have a look at the [event stream navigation](#pattern-navigation) section.
@@ -311,6 +311,71 @@ DEFINE
 
 <span class="label label-danger">Attention</span> The optional reluctant quantifier (`A??` or `A{0,1}?`) is not supported right now.
 
+### Time constraint
+
+Especially for streaming use cases, it is often required that a pattern finishes within a given period of time.
+This allows for limiting the overall state size that Flink has to maintain internally, even in case of greedy quantifiers.
+
+Therefore, Flink SQL supports the additional (non-standard SQL) `WITHIN` clause for defining a time constraint for a pattern. The clause can be defined after the `PATTERN` clause and takes an interval of millisecond resolution.
+
+If the time between the first and last event of a potential match is longer than the given value, such a match will not be appended to the result table.
+
+<span class="label label-info">Note</span> It is generally encouraged to use the `WITHIN` clause as it helps Flink with efficient memory management. Underlying state can be pruned once the threshold is reached.
+
+<span class="label label-danger">Attention</span> However, the `WITHIN` clause is not part of the SQL standard. The recommended way of dealing with time constraints might change in the future.
+
+The use of the `WITHIN` clause is illustrated in the following example query:
+
+{% highlight sql %}
+SELECT *
+FROM Ticker
+    MATCH_RECOGNIZE(
+        PARTITION BY symbol
+        ORDER BY rowtime
+        MEASURES
+            C.rowtime AS dropTime,
+            A.price - C.price AS dropDiff
+        PATTERN (A B* C) WITHIN INTERVAL '1' HOUR
+        ONE ROW PER MATCH
+        AFTER MATCH SKIP PAST LAST ROW
+        DEFINE
+            B AS B.price > A.price - 10
+            C AS C.price < A.price - 10
+    )
+{% endhighlight %}
+
+The query detects a price drop of `10` that happens within an interval of 1 hour.
+
+Let's assume the query is used to analyze the following ticker data:
+
+{% highlight text %}
+symbol         rowtime         price    tax
+======  ====================  ======= =======
+'ACME'  '01-Apr-11 10:00:00'   20      1
+'ACME'  '01-Apr-11 10:20:00'   17      2
+'ACME'  '01-Apr-11 10:40:00'   18      1
+'ACME'  '01-Apr-11 11:00:00'   11      3
+'ACME'  '01-Apr-11 11:20:00'   14      2
+'ACME'  '01-Apr-11 11:40:00'   9       1
+'ACME'  '01-Apr-11 12:00:00'   15      1
+'ACME'  '01-Apr-11 12:20:00'   14      2
+'ACME'  '01-Apr-11 12:40:00'   24      2
+'ACME'  '01-Apr-11 13:00:00'   1       2
+'ACME'  '01-Apr-11 13:20:00'   19      1
+{% endhighlight %}
+
+The query will produce the following results:
+
+{% highlight text %}
+symbol         dropTime         dropDiff
+======  ====================  =============
+'ACME'  '01-Apr-11 13:00:00'      14
+{% endhighlight %}
+
+The resulting row represents a price drop from `15` (at `01-Apr-11 12:00:00`) to `1` (at `01-Apr-11 13:00:00`). The `dropDiff` column contains the price difference.
+
+Notice that even though prices also drop by higher values, for example, by `11` (between `01-Apr-11 10:00:00` and `01-Apr-11 11:40:00`), the time difference between those two events is larger than 1 hour. Thus, they don't produce a match.
+
 Output Mode
 -----------
 
@@ -781,8 +846,8 @@ One has to keep in mind that in case of the `SKIP TO FIRST/LAST variable`strateg
 variable (e.g. for pattern `A*`). In such cases, a runtime exception will be thrown as the standard requires a valid row to continue the
 matching.
 
-
-### Controlling Memory Consumption
+Controlling Memory Consumption
+------------------------------
 
 Memory consumption is an important consideration when writing `MATCH_RECOGNIZE` queries, as the space of potential matches is built in a breadth-first-like manner.
 Having that in mind, one must make sure that the pattern can finish. Preferably with a reasonable number of rows mapped to the match as they have to fit into memory.
@@ -815,8 +880,7 @@ DEFINE
   C as C.price > 20
 {% endhighlight %}
 
-<span class="label label-danger">Attention</span> Please note that the `MATCH_RECOGNIZE` clause does not use a configured [state retention time](query_configuration.html#idle-state-retention-time). As of now, there is also no possibility to define a time restriction on the pattern to finish because there is no such possibility in the SQL standard. The community is in the process of designing a proper syntax for that
-feature right now.
+<span class="label label-danger">Attention</span> Please note that the `MATCH_RECOGNIZE` clause does not use a configured [state retention time](query_configuration.html#idle-state-retention-time). One may want to use the `WITHIN` [clause](#time-constraint) for this purpose.
 
 Known Limitations
 -----------------


[flink] 01/02: [FLINK-7603] [table] Support WITHIN clause in MATCH_RECOGNIZE

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d73b01b138dbc6dec078150d8cf6f9410fe68c7c
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Nov 28 13:31:59 2018 +0100

    [FLINK-7603] [table] Support WITHIN clause in MATCH_RECOGNIZE
    
    Introduces support for WITHIN clause in MATCH_RECOGNIZE that
    allows adding a time constraint for a pattern. It reuses the
    within function of Pattern in the CEP library. The behavior
    is such that the difference between first row in a match and
    last row in a match must be smaller than the given period. The
    WITHIN clause accepts only a constant millisecond interval value.
---
 .../plan/nodes/datastream/DataStreamMatch.scala    | 41 ++++++++++-----
 .../flink/table/match/PatternTranslatorTest.scala  | 59 ++++++++++++++++++++++
 .../table/match/PatternTranslatorTestBase.scala    | 17 ++++---
 3 files changed, 98 insertions(+), 19 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
index 19b298c..493c092 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.SqlMatchRecognize.AfterOption
+import org.apache.calcite.sql.`type`.SqlTypeFamily
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -39,6 +40,7 @@ import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty
 import org.apache.flink.cep.pattern.conditions.BooleanConditions
 import org.apache.flink.cep.{CEP, PatternStream}
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.windowing.time.Time
 
 import scala.collection.JavaConverters._
 import org.apache.flink.table.api._
@@ -95,6 +97,31 @@ class DataStreamMatch(
     explainMatch(super.explainTerms(pw), logicalMatch, inputSchema.fieldNames, getExpressionString)
   }
 
+  private def translateTimeBound(interval: RexNode): Time = {
+    interval match {
+      case x: RexLiteral if x.getTypeName.getFamily == SqlTypeFamily.INTERVAL_DAY_TIME =>
+        Time.milliseconds(x.getValueAs(classOf[JLong]))
+      case _ =>
+        throw new TableException("Only constant intervals with millisecond resolution " +
+          "are supported as time constraints of patterns.")
+    }
+  }
+
+  @VisibleForTesting
+  private[flink] def translatePattern(
+    config: TableConfig,
+    inputTypeInfo: TypeInformation[Row]
+  ): (Pattern[Row, Row], Iterable[String]) = {
+    val patternVisitor = new PatternVisitor(config, inputTypeInfo, logicalMatch)
+    val cepPattern = if (logicalMatch.interval != null) {
+      val interval = translateTimeBound(logicalMatch.interval)
+      logicalMatch.pattern.accept(patternVisitor).within(interval)
+    } else {
+      logicalMatch.pattern.accept(patternVisitor)
+    }
+    (cepPattern, patternVisitor.names)
+  }
+
   override def translateToPlan(
       tableEnv: StreamTableEnvironment,
       queryConfig: StreamQueryConfig)
@@ -120,9 +147,7 @@ class DataStreamMatch(
       crowInput,
       logicalMatch.orderKeys)
 
-    val patternVisitor = new PatternVisitor(config, inputTypeInfo, logicalMatch)
-    val cepPattern = logicalMatch.pattern.accept(patternVisitor)
-    val patternNames = patternVisitor.names
+    val (cepPattern, patternNames) = translatePattern(config, inputTypeInfo)
 
     //TODO remove this once it is supported in CEP library
     if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
@@ -138,11 +163,6 @@ class DataStreamMatch(
           "pattern with either a simple variable or reluctant quantifier.")
     }
 
-    if (logicalMatch.interval != null) {
-      throw new TableException(
-        "WITHIN clause is not part of the SQL Standard, thus it is not supported.")
-    }
-
     val inputDS: DataStream[Row] = timestampedInput
       .map(new CRowToRowMapFunction)
       .setParallelism(timestampedInput.getParallelism)
@@ -232,12 +252,9 @@ class DataStreamMatch(
       inputDs
     }
   }
-
-  @VisibleForTesting private[flink] def getLogicalMatch = logicalMatch
 }
 
-@VisibleForTesting
-private[flink] class PatternVisitor(
+private class PatternVisitor(
     config: TableConfig,
     inputTypeInfo: TypeInformation[Row],
     logicalMatch: MatchRecognize)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
index 9bf651f..53368c0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
@@ -20,10 +20,12 @@ package org.apache.flink.table.`match`
 
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy._
 import org.apache.flink.cep.pattern.Pattern
+import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.table.api.TableException
 import org.junit.Test
 
 class PatternTranslatorTest extends PatternTranslatorTestBase {
+
   @Test
   def simplePattern(): Unit = {
     verifyPattern(
@@ -212,6 +214,63 @@ class PatternTranslatorTest extends PatternTranslatorTestBase {
       Pattern.begin("A\"", skipToNext()).optional().next("\u006C").next("C"))
   }
 
+  @Test
+  def testWithinClause(): Unit = {
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    A.f0 AS aid
+        |  PATTERN (A B) WITHIN INTERVAL '10 00:00:00.004' DAY TO SECOND
+        |  DEFINE
+        |    A as A.f0 = 1
+        |) AS T
+        |""".stripMargin,
+      Pattern.begin("A", skipToNext()).next("B")
+        .within(Time.milliseconds(10 * 24 * 60 * 60 * 1000 + 4)))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    A.f0 AS aid
+        |  PATTERN (A B) WITHIN INTERVAL '10 00' DAY TO HOUR
+        |  DEFINE
+        |    A as A.f0 = 1
+        |) AS T
+        |""".stripMargin,
+      Pattern.begin("A", skipToNext()).next("B")
+        .within(Time.milliseconds(10 * 24 * 60 * 60 * 1000)))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    A.f0 AS aid
+        |  PATTERN (A B) WITHIN INTERVAL '10' MINUTE
+        |  DEFINE
+        |    A as A.f0 = 1
+        |) AS T
+        |""".stripMargin,
+      Pattern.begin("A", skipToNext()).next("B")
+        .within(Time.milliseconds(10 * 60 * 1000)))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testWithinClauseWithYearMonthResolution(): Unit = {
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    A.f0 AS aid
+        |  PATTERN (A B) WITHIN INTERVAL '2-10' YEAR TO MONTH
+        |  DEFINE
+        |    A as A.f0 = 1
+        |) AS T
+        |""".stripMargin,
+      null /* don't care */)
+  }
+
   @Test(expected = classOf[TableException])
   def testReluctantOptionalNotSupported(): Unit = {
     verifyPattern(
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
index 883ce0a..34b528c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironm
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableConfig, TableEnvironment}
 import org.apache.flink.table.calcite.FlinkPlannerImpl
-import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch, DataStreamScan, PatternVisitor}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch, DataStreamScan}
 import org.apache.flink.types.Row
 import org.apache.flink.util.TestLogger
 import org.junit.Assert._
@@ -89,11 +89,8 @@ abstract class PatternTranslatorTestBase extends TestLogger{
       fail("Expression is converted into more than a Match operation. Use a different test method.")
     }
 
-    val dataMatch = optimized
-      .asInstanceOf[DataStreamMatch]
-
-    val pVisitor = new PatternVisitor(new TableConfig, testTableTypeInfo, dataMatch.getLogicalMatch)
-    val p = dataMatch.getLogicalMatch.pattern.accept(pVisitor)
+    val dataMatch = optimized.asInstanceOf[DataStreamMatch]
+    val p = dataMatch.translatePattern(new TableConfig, testTableTypeInfo)._1
 
     compare(expected, p)
   }
@@ -108,10 +105,16 @@ abstract class PatternTranslatorTestBase extends TestLogger{
       val sameSkipStrategy = currentLeft.getAfterMatchSkipStrategy ==
         currentRight.getAfterMatchSkipStrategy
 
+      val sameTimeWindow = if (currentLeft.getWindowTime != null && currentRight != null) {
+        currentLeft.getWindowTime.toMilliseconds == currentRight.getWindowTime.toMilliseconds
+      } else {
+        currentLeft.getWindowTime == null && currentRight.getWindowTime == null
+      }
+
       currentLeft = currentLeft.getPrevious
       currentRight = currentRight.getPrevious
 
-      if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy) {
+      if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy || !sameTimeWindow) {
         throw new ComparisonFailure("Compiled different pattern.",
           expected.toString,
           actual.toString)