You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/01/08 11:59:07 UTC

[flink] branch release-1.7 updated: [FLINK-11191] [table] Check for ambiguous columns in MATCH_RECOGNIZE

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.7
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.7 by this push:
     new b195472  [FLINK-11191] [table] Check for ambiguous columns in MATCH_RECOGNIZE
b195472 is described below

commit b195472c0e513bc8afc36c5761edf2680e7a6f5b
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Dec 18 15:37:42 2018 +0100

    [FLINK-11191] [table] Check for ambiguous columns in MATCH_RECOGNIZE
    
    Added a validation that checks if no ambiguous columns are defined
    in MATCH_RECOGNIZE clause. Without the check there is a cryptic
    message thrown from code generation stack.
    
    This closes #7328.
---
 .../rules/datastream/DataStreamMatchRule.scala     | 61 +++++++++++++++++++++-
 .../table/match/MatchOperatorValidationTest.scala  | 29 ++++++++++
 2 files changed, 88 insertions(+), 2 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala
index 5b0aa65..5c0241f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamMatchRule.scala
@@ -18,15 +18,23 @@
 
 package org.apache.flink.table.plan.rules.datastream
 
-import org.apache.calcite.plan.{RelOptRule, RelTraitSet}
+import java.util.{List => JList}
+
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.flink.table.api.TableException
+import org.apache.calcite.rex.{RexInputRef, RexNode}
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.plan.logical.MatchRecognize
 import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.nodes.datastream.DataStreamMatch
 import org.apache.flink.table.plan.nodes.logical.FlinkLogicalMatch
 import org.apache.flink.table.plan.schema.RowSchema
+import org.apache.flink.table.plan.util.RexDefaultVisitor
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 class DataStreamMatchRule
   extends ConverterRule(
@@ -35,6 +43,14 @@ class DataStreamMatchRule
     FlinkConventions.DATASTREAM,
     "DataStreamMatchRule") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val logicalMatch: FlinkLogicalMatch = call.rel(0).asInstanceOf[FlinkLogicalMatch]
+
+    // This check might be obsolete once CALCITE-2747 is resolved
+    validateAmbiguousColumns(logicalMatch)
+    true
+  }
+
   override def convert(rel: RelNode): RelNode = {
     val logicalMatch: FlinkLogicalMatch = rel.asInstanceOf[FlinkLogicalMatch]
     val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.DATASTREAM)
@@ -71,6 +87,47 @@ class DataStreamMatchRule
       new RowSchema(logicalMatch.getRowType),
       new RowSchema(logicalMatch.getInput.getRowType))
   }
+
+  private def validateAmbiguousColumns(logicalMatch: FlinkLogicalMatch): Unit = {
+    if (logicalMatch.isAllRows) {
+      throw new TableException("All rows per match mode is not supported yet.")
+    } else {
+      val refNameFinder = new RefNameFinder(logicalMatch.getInput.getRowType)
+      validateAmbiguousColumnsOnRowPerMatch(
+        logicalMatch.getPartitionKeys,
+        logicalMatch.getMeasures.keySet().asScala,
+        logicalMatch.getRowType,
+        refNameFinder)
+    }
+  }
+
+  private def validateAmbiguousColumnsOnRowPerMatch(
+      partitionKeys: JList[RexNode],
+      measuresNames: mutable.Set[String],
+      expectedSchema: RelDataType,
+      refNameFinder: RefNameFinder)
+    : Unit = {
+    val actualSize = partitionKeys.size() + measuresNames.size
+    val expectedSize = expectedSchema.getFieldCount
+    if (actualSize != expectedSize) {
+      //try to find ambiguous column
+
+      val ambiguousColumns = partitionKeys.asScala.map(_.accept(refNameFinder))
+        .filter(measuresNames.contains).mkString("{", ", ", "}")
+
+      throw new ValidationException(s"Columns ambiguously defined: $ambiguousColumns")
+    }
+  }
+
+  private class RefNameFinder(inputSchema: RelDataType) extends RexDefaultVisitor[String] {
+
+    override def visitInputRef(inputRef: RexInputRef): String = {
+      inputSchema.getFieldList.get(inputRef.getIndex).getName
+    }
+
+    override def visitNode(rexNode: RexNode): String =
+      throw new TableException(s"PARTITION BY clause accepts only input reference. Found $rexNode")
+  }
 }
 
 object DataStreamMatchRule {
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
index 3917bdf..4fa8b17 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/match/MatchOperatorValidationTest.scala
@@ -180,6 +180,35 @@ class MatchOperatorValidationTest extends TableTestBase {
   }
 
   @Test
+  def testValidatingAmbiguousColumns(): Unit = {
+    thrown.expectMessage("Columns ambiguously defined: {symbol, price}")
+    thrown.expect(classOf[ValidationException])
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM Ticker
+         |MATCH_RECOGNIZE (
+         |  PARTITION BY symbol, price
+         |  ORDER BY proctime
+         |  MEASURES
+         |    A.symbol AS symbol,
+         |    A.price AS price
+         |  PATTERN (A)
+         |  DEFINE
+         |    A AS symbol = 'a'
+         |) AS T
+         |""".stripMargin
+
+    streamUtils.tableEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+  }
+
+  // ***************************************************************************************
+  // * Those validations are temporary. We should remove those tests once we support those *
+  // * features.                                                                           *
+  // ***************************************************************************************
+
+  @Test
   def testPatternsProducingEmptyMatchesAreNotSupported(): Unit = {
     thrown.expectMessage("Patterns that can produce empty matches are not supported. " +
       "There must be at least one non-optional state.")