You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:34 UTC

[flink] 01/11: [FLINK-9738][table] Provide a way to define Temporal Table Functions in Table API

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f2a67a1682249d83711030f4e55b824cb18336d7
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jul 5 19:47:01 2018 +0200

    [FLINK-9738][table] Provide a way to define Temporal Table Functions in Table API
---
 .../scala/org/apache/flink/table/api/table.scala   | 73 ++++++++++++++++++-
 .../flink/table/expressions/fieldExpression.scala  | 13 ++--
 .../table/functions/TemporalTableFunction.scala    | 80 +++++++++++++++++++++
 .../flink/table/plan/logical/operators.scala       | 14 ++++
 .../logical/FlinkLogicalTableFunctionScan.scala    | 36 ++++++++--
 .../api/stream/table/TemporalTableJoinTest.scala   | 84 ++++++++++++++++++++++
 .../TemporalTableJoinValidationTest.scala          | 56 +++++++++++++++
 7 files changed, 344 insertions(+), 12 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index a44bbaa..6c96834 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -21,7 +21,8 @@ import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.operators.join.JoinType
 import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference, WindowProperty}
+import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, ResolvedFieldReference, UnresolvedAlias, UnresolvedFieldReference, WindowProperty}
+import org.apache.flink.table.functions.TemporalTableFunction
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.plan.ProjectionTranslator._
 import org.apache.flink.table.plan.logical.{Minus, _}
@@ -156,6 +157,75 @@ class Table(
   }
 
   /**
+    * Creates [[TemporalTableFunction]] backed up by this table as a history table.
+    * Temporal Tables represent a concept of a table that changes over time and for which
+    * Flink keeps track of those changes. [[TemporalTableFunction]] provides a way how to access
+    * those data.
+    *
+    * For more information please check Flink's documentation on Temporal Tables.
+    *
+    * Currently [[TemporalTableFunction]]s are only supported in streaming.
+    *
+    * @param timeAttribute Must points to a time attribute. Provides a way to compare which records
+    *                      are a newer or older version.
+    * @param primaryKey    Defines the primary key. With primary key it is possible to update
+    *                      a row or to delete it.
+    * @return [[TemporalTableFunction]] which is an instance of
+    *        [[org.apache.flink.table.functions.TableFunction]]. It takes one single argument,
+    *        the `timeAttribute`, for which it returns matching version of the [[Table]], from which
+    *        [[TemporalTableFunction]] was created.
+    */
+  def createTemporalTableFunction(
+      timeAttribute: String,
+      primaryKey: String): TemporalTableFunction = {
+    createTemporalTableFunction(
+      ExpressionParser.parseExpression(timeAttribute),
+      ExpressionParser.parseExpression(primaryKey))
+  }
+
+  /**
+    * Creates [[TemporalTableFunction]] backed up by this table as a history table.
+    * Temporal Tables represent a concept of a table that changes over time and for which
+    * Flink keeps track of those changes. [[TemporalTableFunction]] provides a way how to access
+    * those data.
+    *
+    * For more information please check Flink's documentation on Temporal Tables.
+    *
+    * Currently [[TemporalTableFunction]]s are only supported in streaming.
+    *
+    * @param timeAttribute Must points to a time indicator. Provides a way to compare which records
+    *                      are a newer or older version.
+    * @param primaryKey    Defines the primary key. With primary key it is possible to update
+    *                      a row or to delete it.
+    * @return [[TemporalTableFunction]] which is an instance of
+    *        [[org.apache.flink.table.functions.TableFunction]]. It takes one single argument,
+    *        the `timeAttribute`, for which it returns matching version of the [[Table]], from which
+    *        [[TemporalTableFunction]] was created.
+    */
+  def createTemporalTableFunction(
+      timeAttribute: Expression,
+      primaryKey: Expression): TemporalTableFunction = {
+    val temporalTable = TemporalTable(timeAttribute, primaryKey, logicalPlan)
+      .validate(tableEnv)
+      .asInstanceOf[TemporalTable]
+
+    TemporalTableFunction.create(
+      this,
+      temporalTable.timeAttribute,
+      validatePrimaryKeyExpression(temporalTable.primaryKey))
+  }
+
+  private def validatePrimaryKeyExpression(expression: Expression): String = {
+    expression match {
+      case fieldReference: ResolvedFieldReference =>
+        fieldReference.name
+      case _ => throw new ValidationException(
+        s"Unsupported expression [$expression] as primary key. " +
+          s"Only top-level (not nested) field references are supported.")
+    }
+  }
+
+  /**
     * Renames the fields of the expression result. Use this to disambiguate fields before
     * joining to operations.
     *
@@ -1178,5 +1248,4 @@ class WindowGroupedTable(
     val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))
     select(withResolvedAggFunctionCall: _*)
   }
-
 }
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
index f3ef039..3de0175 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -174,9 +174,10 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
         ValidationSuccess
       case WindowReference(_, _) =>
         ValidationFailure("Reference to a rowtime or proctime window required.")
-      case _ =>
+      case any =>
         ValidationFailure(
-          "The '.rowtime' expression can only be used for table definitions and windows.")
+          s"The '.rowtime' expression can only be used for table definitions and windows, " +
+            s"while [$any] was found.")
     }
   }
 
@@ -189,8 +190,7 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
         // batch time window
         Types.SQL_TIMESTAMP
       case _ =>
-        throw TableException("WindowReference of RowtimeAttribute has invalid type. " +
-          "Please report this bug.")
+        throw TableException("RowtimeAttribute has invalid type. Please report this bug.")
     }
   }
 
@@ -208,9 +208,10 @@ case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) {
         ValidationSuccess
       case WindowReference(_, _) =>
         ValidationFailure("Reference to a rowtime or proctime window required.")
-      case _ =>
+      case any =>
         ValidationFailure(
-          "The '.proctime' expression can only be used for table definitions and windows.")
+          "The '.proctime' expression can only be used for table definitions and windows, " +
+            s"while [$any] was found.")
     }
   }
 
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TemporalTableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TemporalTableFunction.scala
new file mode 100644
index 0000000..7943144
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TemporalTableFunction.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.types.Row
+
+/**
+  * Class representing temporal table function over some history table.
+  * It takes one single argument, the `timeAttribute`, for which it returns matching version of
+  * the `underlyingHistoryTable`, from which this [[TemporalTableFunction]] was created.
+  *
+  * This function shouldn't be evaluated. Instead calls to it should be rewritten by the optimiser
+  * into other operators (like Temporal Table Join).
+  */
+class TemporalTableFunction private(
+    @transient private val underlyingHistoryTable: Table,
+    private val timeAttribute: Expression,
+    private val primaryKey: String,
+    private val resultType: RowTypeInfo)
+  extends TableFunction[Row] {
+
+  def eval(row: Timestamp): Unit = {
+    throw new IllegalStateException("This should never be called")
+  }
+
+  override def getResultType: RowTypeInfo = {
+    resultType
+  }
+
+  def getTimeAttribute: Expression = {
+    timeAttribute
+  }
+
+  def getPrimaryKey: String = {
+    primaryKey
+  }
+
+  private[flink] def getUnderlyingHistoryTable: Table = {
+    if (underlyingHistoryTable == null) {
+      throw new IllegalStateException("Accessing table field after planing/serialization")
+    }
+    underlyingHistoryTable
+  }
+}
+
+object TemporalTableFunction {
+  def create(
+      table: Table,
+      timeAttribute: Expression,
+      primaryKey: String): TemporalTableFunction = {
+    new TemporalTableFunction(
+      table,
+      timeAttribute,
+      primaryKey,
+      new RowTypeInfo(
+        table.getSchema.getTypes,
+        table.getSchema.getColumnNames))
+  }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 7579621..84e3f79 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -686,6 +686,20 @@ case class WindowAggregate(
   }
 }
 
+case class TemporalTable(
+    timeAttribute: Expression,
+    primaryKey: Expression,
+    child: LogicalNode)
+  extends UnaryNode {
+
+  override def output: Seq[Attribute] = child.output
+
+  override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
+    throw new UnsupportedOperationException(
+      "This should never be called. This node is supposed to be used only for validation")
+  }
+}
+
 /**
   * LogicalNode for calling a user-defined table functions.
   *
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
index 4bf1ca1..0ed2301 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
@@ -21,14 +21,16 @@ package org.apache.flink.table.plan.nodes.logical
 import java.lang.reflect.Type
 import java.util.{List => JList, Set => JSet}
 
-import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet}
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRuleCall, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.{TableFunctionScan, TableScan}
-import org.apache.calcite.rel.logical.{LogicalTableFunctionScan, LogicalTableScan}
+import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
 import org.apache.calcite.rel.metadata.RelColumnMapping
-import org.apache.calcite.rex.RexNode
+import org.apache.calcite.rex.{RexCall, RexNode, RexVisitorImpl}
+import org.apache.flink.table.functions.TemporalTableFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.plan.nodes.FlinkConventions
 
 class FlinkLogicalTableFunctionScan(
@@ -74,6 +76,32 @@ class FlinkLogicalTableFunctionScanConverter
     FlinkConventions.LOGICAL,
     "FlinkLogicalTableFunctionScanConverter") {
 
+  /**
+    * This rule do not match to [[TemporalTableFunction]]. We do not support reading from
+    * [[TemporalTableFunction]]s as TableFunctions. We expect them to be rewritten into
+    * [[org.apache.flink.table.plan.nodes.datastream.DataStreamScan]] followed by for
+    * example [[org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalTableJoin]].
+    */
+  override def matches(call: RelOptRuleCall): Boolean = {
+    val logicalTableFunction: LogicalTableFunctionScan = call.rel(0)
+
+    !isTemporalTableFunctionCall(logicalTableFunction)
+  }
+
+  private def isTemporalTableFunctionCall(logicalTableFunction: LogicalTableFunctionScan)
+    : Boolean = {
+
+    if (!logicalTableFunction.getCall.isInstanceOf[RexCall]) {
+      return false
+    }
+    val rexCall = logicalTableFunction.getCall().asInstanceOf[RexCall]
+    if (!rexCall.getOperator.isInstanceOf[TableSqlFunction]) {
+      return false
+    }
+    val tableFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+    tableFunction.getTableFunction.isInstanceOf[TemporalTableFunction]
+  }
+
   def convert(rel: RelNode): RelNode = {
     val scan = rel.asInstanceOf[LogicalTableFunctionScan]
     val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
new file mode 100644
index 0000000..0942dd3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.stream.table
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.ResolvedFieldReference
+import org.apache.flink.table.functions.TemporalTableFunction
+import org.apache.flink.table.utils._
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
+import org.junit.Test
+
+class TemporalTableJoinTest extends TableTestBase {
+
+  val util: TableTestUtil = streamTestUtil()
+
+  val ratesHistory = util.addTable[(String, Int, Timestamp)](
+    "RatesHistory", 'currency, 'rate, 'rowtime.rowtime)
+
+  val rates = util.addFunction(
+    "Rates",
+    ratesHistory.createTemporalTableFunction('rowtime, 'currency))
+
+  @Test
+  def testTemporalTableFunctionScan(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage(
+      "Cannot translate a query with an unbounded table function call")
+
+    val result = rates(java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123"))
+    util.printTable(result)
+  }
+
+  @Test
+  def testProcessingTimeIndicatorVersion(): Unit = {
+    val util: TableTestUtil = streamTestUtil()
+    val ratesHistory = util.addTable[(String, Int)](
+      "RatesHistory", 'currency, 'rate, 'proctime.proctime)
+    val rates = ratesHistory.createTemporalTableFunction('proctime, 'currency)
+    assertRatesFunction(ratesHistory.getSchema, rates, true)
+  }
+
+  @Test
+  def testValidStringFieldReference(): Unit = {
+    val rates = ratesHistory.createTemporalTableFunction("rowtime", "currency")
+    assertRatesFunction(ratesHistory.getSchema, rates)
+  }
+
+  private def assertRatesFunction(
+      expectedSchema: TableSchema,
+      rates: TemporalTableFunction,
+      proctime: Boolean = false): Unit = {
+    assertEquals("currency", rates.getPrimaryKey)
+    assertTrue(rates.getTimeAttribute.isInstanceOf[ResolvedFieldReference])
+    assertEquals(
+      if (proctime) "proctime" else "rowtime",
+      rates.getTimeAttribute.asInstanceOf[ResolvedFieldReference].name)
+    assertArrayEquals(
+      expectedSchema.getColumnNames.asInstanceOf[Array[Object]],
+      rates.getResultType.getFieldNames.asInstanceOf[Array[Object]])
+    assertArrayEquals(
+      expectedSchema.getTypes.asInstanceOf[Array[Object]],
+      rates.getResultType.getFieldTypes.asInstanceOf[Array[Object]])
+  }
+}
+
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
new file mode 100644
index 0000000..71b1585
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.validation
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils._
+import org.junit.Test
+
+class TemporalTableJoinValidationTest extends TableTestBase {
+
+  val util: TableTestUtil = streamTestUtil()
+
+  val orders = util.addTable[(Long, String, Timestamp)](
+    "Orders", 'o_amount, 'o_currency, 'o_rowtime.rowtime)
+
+  val ratesHistory = util.addTable[(String, Int, Timestamp)](
+    "RatesHistory", 'currency, 'rate, 'rowtime.rowtime)
+
+  @Test
+  def testInvalidFieldReference(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage("Cannot resolve field [foobar]")
+
+    ratesHistory.createTemporalTableFunction('rowtime, 'foobar)
+  }
+
+  @Test
+  def testInvalidStringFieldReference(): Unit = {
+    expectedException.expect(classOf[ValidationException])
+    expectedException.expectMessage("Cannot resolve field [foobar]")
+
+    ratesHistory.createTemporalTableFunction("rowtime", "foobar")
+  }
+}
+
+