You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/01/08 11:59:07 UTC
[flink] branch release-1.7 updated: [FLINK-11191] [table] Check for
ambiguous columns in MATCH_RECOGNIZE
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.7 by this push:
new b195472 [FLINK-11191] [table] Check for ambiguous columns in MATCH_RECOGNIZE
b195472 is described below
commit b195472c0e513bc8afc36c5761edf2680e7a6f5b
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Dec 18 15:37:42 2018 +0100
[FLINK-11191] [table] Check for ambiguous columns in MATCH_RECOGNIZE
Added a validation that checks if no ambiguous columns are defined
in MATCH_RECOGNIZE clause. Without the check there is a cryptic
message thrown from code generation stack.
This closes #7328.
---
.../rules/datastream/DataStreamMatchRule.scala | 61 +++++++++++++++++++++-
.../table/match/MatchOperatorValidationTest.scala | 29 ++++++++++
2 files changed, 88 insertions(+), 2 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala
index 5b0aa65..5c0241f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala
@@ -18,15 +18,23 @@
package org.apache.flink.table.plan.rules.datastream
-import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import java.util.{List => JList}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.table.api.TableException
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.plan.logical.MatchRecognize
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.datastream.DataStreamMatch
import org.apache.flink.table.plan.nodes.logical.FlinkLogicalMatch
import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
class DataStreamMatchRule
extends ConverterRule(
@@ -35,6 +43,14 @@ class DataStreamMatchRule
FlinkConventions.DATASTREAM,
"DataStreamMatchRule") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val logicalMatch: FlinkLogicalMatch = call.rel(0).asInstanceOf[FlinkLogicalMatch]
+
+ // This check might be obsolete once CALCITE-2747 is resolved
+ validateAmbiguousColumns(logicalMatch)
+ true
+ }
+
override def convert(rel: RelNode): RelNode = {
val logicalMatch: FlinkLogicalMatch = rel.asInstanceOf[FlinkLogicalMatch]
val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
@@ -71,6 +87,47 @@ class DataStreamMatchRule
new RowSchema(logicalMatch.getRowType),
new RowSchema(logicalMatch.getInput.getRowType))
}
+
+ private def validateAmbiguousColumns(logicalMatch: FlinkLogicalMatch): Unit = {
+ if (logicalMatch.isAllRows) {
+ throw new TableException("All rows per match mode is not supported yet.")
+ } else {
+ val refNameFinder = new RefNameFinder(logicalMatch.getInput.getRowType)
+ validateAmbiguousColumnsOnRowPerMatch(
+ logicalMatch.getPartitionKeys,
+ logicalMatch.getMeasures.keySet().asScala,
+ logicalMatch.getRowType,
+ refNameFinder)
+ }
+ }
+
+ private def validateAmbiguousColumnsOnRowPerMatch(
+ partitionKeys: JList[RexNode],
+ measuresNames: mutable.Set[String],
+ expectedSchema: RelDataType,
+ refNameFinder: RefNameFinder)
+ : Unit = {
+ val actualSize = partitionKeys.size() + measuresNames.size
+ val expectedSize = expectedSchema.getFieldCount
+ if (actualSize != expectedSize) {
+ //try to find ambiguous column
+
+ val ambiguousColumns = partitionKeys.asScala.map(_.accept(refNameFinder))
+ .filter(measuresNames.contains).mkString("{", ", ", "}")
+
+ throw new ValidationException(s"Columns ambiguously defined: $ambiguousColumns")
+ }
+ }
+
+ private class RefNameFinder(inputSchema: RelDataType) extends RexDefaultVisitor[String] {
+
+ override def visitInputRef(inputRef: RexInputRef): String = {
+ inputSchema.getFieldList.get(inputRef.getIndex).getName
+ }
+
+ override def visitNode(rexNode: RexNode): String =
+ throw new TableException(s"PARTITION BY clause accepts only input reference. Found $rexNode")
+ }
}
object DataStreamMatchRule {
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
index 3917bdf..4fa8b17 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
@@ -180,6 +180,35 @@ class MatchOperatorValidationTest extends TableTestBase {
}
@Test
+ def testValidatingAmbiguousColumns(): Unit = {
+ thrown.expectMessage("Columns ambiguously defined: {symbol, price}")
+ thrown.expect(classOf[ValidationException])
+
+ val sqlQuery =
+ s"""
+ |SELECT *
+ |FROM Ticker
+ |MATCH_RECOGNIZE (
+ | PARTITION BY symbol, price
+ | ORDER BY proctime
+ | MEASURES
+ | A.symbol AS symbol,
+ | A.price AS price
+ | PATTERN (A)
+ | DEFINE
+ | A AS symbol = 'a'
+ |) AS T
+ |""".stripMargin
+
+ streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+ }
+
+ // ***************************************************************************************
+ // * Those validations are temporary. We should remove those tests once we support those *
+ // * features. *
+ // ***************************************************************************************
+
+ @Test
def testPatternsProducingEmptyMatchesAreNotSupported(): Unit = {
thrown.expectMessage("Patterns that can produce empty matches are not supported. " +
"There must be at least one non-optional state.")