You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/12/03 17:16:36 UTC

[flink] 01/02: [FLINK-7603] [table] Support WITHIN clause in MATCH_RECOGNIZE

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d73b01b138dbc6dec078150d8cf6f9410fe68c7c
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Nov 28 13:31:59 2018 +0100

    [FLINK-7603] [table] Support WITHIN clause in MATCH_RECOGNIZE
    
    Introduces support for WITHIN clause in MATCH_RECOGNIZE that
    allows adding a time constraint for a pattern. It reuses the
    within function of Pattern in the CEP library. The behavior
    is such that the difference between first row in a match and
    last row in a match must be smaller than the given period. The
    WITHIN clause accepts only a constant millisecond interval value.
---
 .../plan/nodes/datastream/DataStreamMatch.scala    | 41 ++++++++++-----
 .../flink/table/match/PatternTranslatorTest.scala  | 59 ++++++++++++++++++++++
 .../table/match/PatternTranslatorTestBase.scala    | 17 ++++---
 3 files changed, 98 insertions(+), 19 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
index 19b298c..493c092 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.SqlKind
 import org.apache.calcite.sql.SqlMatchRecognize.AfterOption
+import org.apache.calcite.sql.`type`.SqlTypeFamily
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -39,6 +40,7 @@ import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty
 import org.apache.flink.cep.pattern.conditions.BooleanConditions
 import org.apache.flink.cep.{CEP, PatternStream}
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.windowing.time.Time
 
 import scala.collection.JavaConverters._
 import org.apache.flink.table.api._
@@ -95,6 +97,31 @@ class DataStreamMatch(
     explainMatch(super.explainTerms(pw), logicalMatch, inputSchema.fieldNames, getExpressionString)
   }
 
+  private def translateTimeBound(interval: RexNode): Time = {
+    interval match {
+      case x: RexLiteral if x.getTypeName.getFamily == SqlTypeFamily.INTERVAL_DAY_TIME =>
+        Time.milliseconds(x.getValueAs(classOf[JLong]))
+      case _ =>
+        throw new TableException("Only constant intervals with millisecond resolution " +
+          "are supported as time constraints of patterns.")
+    }
+  }
+
+  @VisibleForTesting
+  private[flink] def translatePattern(
+    config: TableConfig,
+    inputTypeInfo: TypeInformation[Row]
+  ): (Pattern[Row, Row], Iterable[String]) = {
+    val patternVisitor = new PatternVisitor(config, inputTypeInfo, logicalMatch)
+    val cepPattern = if (logicalMatch.interval != null) {
+      val interval = translateTimeBound(logicalMatch.interval)
+      logicalMatch.pattern.accept(patternVisitor).within(interval)
+    } else {
+      logicalMatch.pattern.accept(patternVisitor)
+    }
+    (cepPattern, patternVisitor.names)
+  }
+
   override def translateToPlan(
       tableEnv: StreamTableEnvironment,
       queryConfig: StreamQueryConfig)
@@ -120,9 +147,7 @@ class DataStreamMatch(
       crowInput,
       logicalMatch.orderKeys)
 
-    val patternVisitor = new PatternVisitor(config, inputTypeInfo, logicalMatch)
-    val cepPattern = logicalMatch.pattern.accept(patternVisitor)
-    val patternNames = patternVisitor.names
+    val (cepPattern, patternNames) = translatePattern(config, inputTypeInfo)
 
     //TODO remove this once it is supported in CEP library
     if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
@@ -138,11 +163,6 @@ class DataStreamMatch(
           "pattern with either a simple variable or reluctant quantifier.")
     }
 
-    if (logicalMatch.interval != null) {
-      throw new TableException(
-        "WITHIN clause is not part of the SQL Standard, thus it is not supported.")
-    }
-
     val inputDS: DataStream[Row] = timestampedInput
       .map(new CRowToRowMapFunction)
       .setParallelism(timestampedInput.getParallelism)
@@ -232,12 +252,9 @@ class DataStreamMatch(
       inputDs
     }
   }
-
-  @VisibleForTesting private[flink] def getLogicalMatch = logicalMatch
 }
 
-@VisibleForTesting
-private[flink] class PatternVisitor(
+private class PatternVisitor(
     config: TableConfig,
     inputTypeInfo: TypeInformation[Row],
     logicalMatch: MatchRecognize)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
index 9bf651f..53368c0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
@@ -20,10 +20,12 @@ package org.apache.flink.table.`match`
 
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy._
 import org.apache.flink.cep.pattern.Pattern
+import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.table.api.TableException
 import org.junit.Test
 
 class PatternTranslatorTest extends PatternTranslatorTestBase {
+
   @Test
   def simplePattern(): Unit = {
     verifyPattern(
@@ -212,6 +214,63 @@ class PatternTranslatorTest extends PatternTranslatorTestBase {
       Pattern.begin("A\"", skipToNext()).optional().next("\u006C").next("C"))
   }
 
+  @Test
+  def testWithinClause(): Unit = {
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    A.f0 AS aid
+        |  PATTERN (A B) WITHIN INTERVAL '10 00:00:00.004' DAY TO SECOND
+        |  DEFINE
+        |    A as A.f0 = 1
+        |) AS T
+        |""".stripMargin,
+      Pattern.begin("A", skipToNext()).next("B")
+        .within(Time.milliseconds(10 * 24 * 60 * 60 * 1000 + 4)))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    A.f0 AS aid
+        |  PATTERN (A B) WITHIN INTERVAL '10 00' DAY TO HOUR
+        |  DEFINE
+        |    A as A.f0 = 1
+        |) AS T
+        |""".stripMargin,
+      Pattern.begin("A", skipToNext()).next("B")
+        .within(Time.milliseconds(10 * 24 * 60 * 60 * 1000)))
+
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    A.f0 AS aid
+        |  PATTERN (A B) WITHIN INTERVAL '10' MINUTE
+        |  DEFINE
+        |    A as A.f0 = 1
+        |) AS T
+        |""".stripMargin,
+      Pattern.begin("A", skipToNext()).next("B")
+        .within(Time.milliseconds(10 * 60 * 1000)))
+  }
+
+  @Test(expected = classOf[TableException])
+  def testWithinClauseWithYearMonthResolution(): Unit = {
+    verifyPattern(
+      """MATCH_RECOGNIZE (
+        |  ORDER BY proctime
+        |  MEASURES
+        |    A.f0 AS aid
+        |  PATTERN (A B) WITHIN INTERVAL '2-10' YEAR TO MONTH
+        |  DEFINE
+        |    A as A.f0 = 1
+        |) AS T
+        |""".stripMargin,
+      null /* don't care */)
+  }
+
   @Test(expected = classOf[TableException])
   def testReluctantOptionalNotSupported(): Unit = {
     verifyPattern(
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
index 883ce0a..34b528c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironm
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableConfig, TableEnvironment}
 import org.apache.flink.table.calcite.FlinkPlannerImpl
-import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch, DataStreamScan, PatternVisitor}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch, DataStreamScan}
 import org.apache.flink.types.Row
 import org.apache.flink.util.TestLogger
 import org.junit.Assert._
@@ -89,11 +89,8 @@ abstract class PatternTranslatorTestBase extends TestLogger{
       fail("Expression is converted into more than a Match operation. Use a different test method.")
     }
 
-    val dataMatch = optimized
-      .asInstanceOf[DataStreamMatch]
-
-    val pVisitor = new PatternVisitor(new TableConfig, testTableTypeInfo, dataMatch.getLogicalMatch)
-    val p = dataMatch.getLogicalMatch.pattern.accept(pVisitor)
+    val dataMatch = optimized.asInstanceOf[DataStreamMatch]
+    val p = dataMatch.translatePattern(new TableConfig, testTableTypeInfo)._1
 
     compare(expected, p)
   }
@@ -108,10 +105,16 @@ abstract class PatternTranslatorTestBase extends TestLogger{
       val sameSkipStrategy = currentLeft.getAfterMatchSkipStrategy ==
         currentRight.getAfterMatchSkipStrategy
 
+      val sameTimeWindow = if (currentLeft.getWindowTime != null && currentRight != null) {
+        currentLeft.getWindowTime.toMilliseconds == currentRight.getWindowTime.toMilliseconds
+      } else {
+        currentLeft.getWindowTime == null && currentRight.getWindowTime == null
+      }
+
       currentLeft = currentLeft.getPrevious
       currentRight = currentRight.getPrevious
 
-      if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy) {
+      if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy || !sameTimeWindow) {
         throw new ComparisonFailure("Compiled different pattern.",
           expected.toString,
           actual.toString)