You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2018/12/03 17:16:36 UTC
[flink] 01/02: [FLINK-7603] [table] Support WITHIN clause in
MATCH_RECOGNIZE
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit d73b01b138dbc6dec078150d8cf6f9410fe68c7c
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Nov 28 13:31:59 2018 +0100
[FLINK-7603] [table] Support WITHIN clause in MATCH_RECOGNIZE
Introduces support for WITHIN clause in MATCH_RECOGNIZE that
allows adding a time constraint for a pattern. It reuses the
within function of Pattern in the CEP library. The behavior
is such that the difference between first row in a match and
last row in a match must be smaller than the given period. The
WITHIN clause accepts only a constant millisecond interval value.
---
.../plan/nodes/datastream/DataStreamMatch.scala | 41 ++++++++++-----
.../flink/table/match/PatternTranslatorTest.scala | 59 ++++++++++++++++++++++
.../table/match/PatternTranslatorTestBase.scala | 17 ++++---
3 files changed, 98 insertions(+), 19 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
index 19b298c..493c092 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamMatch.scala
@@ -28,6 +28,7 @@ import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlKind
import org.apache.calcite.sql.SqlMatchRecognize.AfterOption
+import org.apache.calcite.sql.`type`.SqlTypeFamily
import org.apache.calcite.sql.fun.SqlStdOperatorTable._
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -39,6 +40,7 @@ import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty
import org.apache.flink.cep.pattern.conditions.BooleanConditions
import org.apache.flink.cep.{CEP, PatternStream}
import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.windowing.time.Time
import scala.collection.JavaConverters._
import org.apache.flink.table.api._
@@ -95,6 +97,31 @@ class DataStreamMatch(
explainMatch(super.explainTerms(pw), logicalMatch, inputSchema.fieldNames, getExpressionString)
}
+ private def translateTimeBound(interval: RexNode): Time = {
+ interval match {
+ case x: RexLiteral if x.getTypeName.getFamily == SqlTypeFamily.INTERVAL_DAY_TIME =>
+ Time.milliseconds(x.getValueAs(classOf[JLong]))
+ case _ =>
+ throw new TableException("Only constant intervals with millisecond resolution " +
+ "are supported as time constraints of patterns.")
+ }
+ }
+
+ @VisibleForTesting
+ private[flink] def translatePattern(
+ config: TableConfig,
+ inputTypeInfo: TypeInformation[Row]
+ ): (Pattern[Row, Row], Iterable[String]) = {
+ val patternVisitor = new PatternVisitor(config, inputTypeInfo, logicalMatch)
+ val cepPattern = if (logicalMatch.interval != null) {
+ val interval = translateTimeBound(logicalMatch.interval)
+ logicalMatch.pattern.accept(patternVisitor).within(interval)
+ } else {
+ logicalMatch.pattern.accept(patternVisitor)
+ }
+ (cepPattern, patternVisitor.names)
+ }
+
override def translateToPlan(
tableEnv: StreamTableEnvironment,
queryConfig: StreamQueryConfig)
@@ -120,9 +147,7 @@ class DataStreamMatch(
crowInput,
logicalMatch.orderKeys)
- val patternVisitor = new PatternVisitor(config, inputTypeInfo, logicalMatch)
- val cepPattern = logicalMatch.pattern.accept(patternVisitor)
- val patternNames = patternVisitor.names
+ val (cepPattern, patternNames) = translatePattern(config, inputTypeInfo)
//TODO remove this once it is supported in CEP library
if (NFACompiler.canProduceEmptyMatches(cepPattern)) {
@@ -138,11 +163,6 @@ class DataStreamMatch(
"pattern with either a simple variable or reluctant quantifier.")
}
- if (logicalMatch.interval != null) {
- throw new TableException(
- "WITHIN clause is not part of the SQL Standard, thus it is not supported.")
- }
-
val inputDS: DataStream[Row] = timestampedInput
.map(new CRowToRowMapFunction)
.setParallelism(timestampedInput.getParallelism)
@@ -232,12 +252,9 @@ class DataStreamMatch(
inputDs
}
}
-
- @VisibleForTesting private[flink] def getLogicalMatch = logicalMatch
}
-@VisibleForTesting
-private[flink] class PatternVisitor(
+private class PatternVisitor(
config: TableConfig,
inputTypeInfo: TypeInformation[Row],
logicalMatch: MatchRecognize)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
index 9bf651f..53368c0 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTest.scala
@@ -20,10 +20,12 @@ package org.apache.flink.table.`match`
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy._
import org.apache.flink.cep.pattern.Pattern
+import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.TableException
import org.junit.Test
class PatternTranslatorTest extends PatternTranslatorTestBase {
+
@Test
def simplePattern(): Unit = {
verifyPattern(
@@ -212,6 +214,63 @@ class PatternTranslatorTest extends PatternTranslatorTestBase {
Pattern.begin("A\"", skipToNext()).optional().next("\u006C").next("C"))
}
+ @Test
+ def testWithinClause(): Unit = {
+ verifyPattern(
+ """MATCH_RECOGNIZE (
+ | ORDER BY proctime
+ | MEASURES
+ | A.f0 AS aid
+ | PATTERN (A B) WITHIN INTERVAL '10 00:00:00.004' DAY TO SECOND
+ | DEFINE
+ | A as A.f0 = 1
+ |) AS T
+ |""".stripMargin,
+ Pattern.begin("A", skipToNext()).next("B")
+ .within(Time.milliseconds(10 * 24 * 60 * 60 * 1000 + 4)))
+
+ verifyPattern(
+ """MATCH_RECOGNIZE (
+ | ORDER BY proctime
+ | MEASURES
+ | A.f0 AS aid
+ | PATTERN (A B) WITHIN INTERVAL '10 00' DAY TO HOUR
+ | DEFINE
+ | A as A.f0 = 1
+ |) AS T
+ |""".stripMargin,
+ Pattern.begin("A", skipToNext()).next("B")
+ .within(Time.milliseconds(10 * 24 * 60 * 60 * 1000)))
+
+ verifyPattern(
+ """MATCH_RECOGNIZE (
+ | ORDER BY proctime
+ | MEASURES
+ | A.f0 AS aid
+ | PATTERN (A B) WITHIN INTERVAL '10' MINUTE
+ | DEFINE
+ | A as A.f0 = 1
+ |) AS T
+ |""".stripMargin,
+ Pattern.begin("A", skipToNext()).next("B")
+ .within(Time.milliseconds(10 * 60 * 1000)))
+ }
+
+ @Test(expected = classOf[TableException])
+ def testWithinClauseWithYearMonthResolution(): Unit = {
+ verifyPattern(
+ """MATCH_RECOGNIZE (
+ | ORDER BY proctime
+ | MEASURES
+ | A.f0 AS aid
+ | PATTERN (A B) WITHIN INTERVAL '2-10' YEAR TO MONTH
+ | DEFINE
+ | A as A.f0 = 1
+ |) AS T
+ |""".stripMargin,
+ null /* don't care */)
+ }
+
@Test(expected = classOf[TableException])
def testReluctantOptionalNotSupported(): Unit = {
verifyPattern(
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
index 883ce0a..34b528c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/PatternTranslatorTestBase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironm
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.{TableConfig, TableEnvironment}
import org.apache.flink.table.calcite.FlinkPlannerImpl
-import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch, DataStreamScan, PatternVisitor}
+import org.apache.flink.table.plan.nodes.datastream.{DataStreamMatch, DataStreamScan}
import org.apache.flink.types.Row
import org.apache.flink.util.TestLogger
import org.junit.Assert._
@@ -89,11 +89,8 @@ abstract class PatternTranslatorTestBase extends TestLogger{
fail("Expression is converted into more than a Match operation. Use a different test method.")
}
- val dataMatch = optimized
- .asInstanceOf[DataStreamMatch]
-
- val pVisitor = new PatternVisitor(new TableConfig, testTableTypeInfo, dataMatch.getLogicalMatch)
- val p = dataMatch.getLogicalMatch.pattern.accept(pVisitor)
+ val dataMatch = optimized.asInstanceOf[DataStreamMatch]
+ val p = dataMatch.translatePattern(new TableConfig, testTableTypeInfo)._1
compare(expected, p)
}
@@ -108,10 +105,16 @@ abstract class PatternTranslatorTestBase extends TestLogger{
val sameSkipStrategy = currentLeft.getAfterMatchSkipStrategy ==
currentRight.getAfterMatchSkipStrategy
+ val sameTimeWindow = if (currentLeft.getWindowTime != null && currentRight != null) {
+ currentLeft.getWindowTime.toMilliseconds == currentRight.getWindowTime.toMilliseconds
+ } else {
+ currentLeft.getWindowTime == null && currentRight.getWindowTime == null
+ }
+
currentLeft = currentLeft.getPrevious
currentRight = currentRight.getPrevious
- if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy) {
+ if (!sameName || !sameQuantifier || !sameTimes || !sameSkipStrategy || !sameTimeWindow) {
throw new ComparisonFailure("Compiled different pattern.",
expected.toString,
actual.toString)