You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:34 UTC
[flink] 01/11: [FLINK-9738][table] Provide a way to define Temporal
Table Functions in Table API
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit f2a67a1682249d83711030f4e55b824cb18336d7
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Thu Jul 5 19:47:01 2018 +0200
[FLINK-9738][table] Provide a way to define Temporal Table Functions in Table API
---
.../scala/org/apache/flink/table/api/table.scala | 73 ++++++++++++++++++-
.../flink/table/expressions/fieldExpression.scala | 13 ++--
.../table/functions/TemporalTableFunction.scala | 80 +++++++++++++++++++++
.../flink/table/plan/logical/operators.scala | 14 ++++
.../logical/FlinkLogicalTableFunctionScan.scala | 36 ++++++++--
.../api/stream/table/TemporalTableJoinTest.scala | 84 ++++++++++++++++++++++
.../TemporalTableJoinValidationTest.scala | 56 +++++++++++++++
7 files changed, 344 insertions(+), 12 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index a44bbaa..6c96834 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -21,7 +21,8 @@ import org.apache.calcite.rel.RelNode
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.operators.join.JoinType
import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference, WindowProperty}
+import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, ResolvedFieldReference, UnresolvedAlias, UnresolvedFieldReference, WindowProperty}
+import org.apache.flink.table.functions.TemporalTableFunction
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.plan.ProjectionTranslator._
import org.apache.flink.table.plan.logical.{Minus, _}
@@ -156,6 +157,75 @@ class Table(
}
/**
+ * Creates [[TemporalTableFunction]] backed up by this table as a history table.
+ * Temporal Tables represent a concept of a table that changes over time and for which
+ * Flink keeps track of those changes. [[TemporalTableFunction]] provides a way how to access
+ * those data.
+ *
+ * For more information please check Flink's documentation on Temporal Tables.
+ *
+ * Currently [[TemporalTableFunction]]s are only supported in streaming.
+ *
+ * @param timeAttribute Must points to a time attribute. Provides a way to compare which records
+ * are a newer or older version.
+ * @param primaryKey Defines the primary key. With primary key it is possible to update
+ * a row or to delete it.
+ * @return [[TemporalTableFunction]] which is an instance of
+ * [[org.apache.flink.table.functions.TableFunction]]. It takes one single argument,
+ * the `timeAttribute`, for which it returns matching version of the [[Table]], from which
+ * [[TemporalTableFunction]] was created.
+ */
+ def createTemporalTableFunction(
+ timeAttribute: String,
+ primaryKey: String): TemporalTableFunction = {
+ createTemporalTableFunction(
+ ExpressionParser.parseExpression(timeAttribute),
+ ExpressionParser.parseExpression(primaryKey))
+ }
+
+ /**
+ * Creates [[TemporalTableFunction]] backed up by this table as a history table.
+ * Temporal Tables represent a concept of a table that changes over time and for which
+ * Flink keeps track of those changes. [[TemporalTableFunction]] provides a way how to access
+ * those data.
+ *
+ * For more information please check Flink's documentation on Temporal Tables.
+ *
+ * Currently [[TemporalTableFunction]]s are only supported in streaming.
+ *
+ * @param timeAttribute Must points to a time indicator. Provides a way to compare which records
+ * are a newer or older version.
+ * @param primaryKey Defines the primary key. With primary key it is possible to update
+ * a row or to delete it.
+ * @return [[TemporalTableFunction]] which is an instance of
+ * [[org.apache.flink.table.functions.TableFunction]]. It takes one single argument,
+ * the `timeAttribute`, for which it returns matching version of the [[Table]], from which
+ * [[TemporalTableFunction]] was created.
+ */
+ def createTemporalTableFunction(
+ timeAttribute: Expression,
+ primaryKey: Expression): TemporalTableFunction = {
+ val temporalTable = TemporalTable(timeAttribute, primaryKey, logicalPlan)
+ .validate(tableEnv)
+ .asInstanceOf[TemporalTable]
+
+ TemporalTableFunction.create(
+ this,
+ temporalTable.timeAttribute,
+ validatePrimaryKeyExpression(temporalTable.primaryKey))
+ }
+
+ private def validatePrimaryKeyExpression(expression: Expression): String = {
+ expression match {
+ case fieldReference: ResolvedFieldReference =>
+ fieldReference.name
+ case _ => throw new ValidationException(
+ s"Unsupported expression [$expression] as primary key. " +
+ s"Only top-level (not nested) field references are supported.")
+ }
+ }
+
+ /**
* Renames the fields of the expression result. Use this to disambiguate fields before
* joining to operations.
*
@@ -1178,5 +1248,4 @@ class WindowGroupedTable(
val withResolvedAggFunctionCall = fieldExprs.map(replaceAggFunctionCall(_, table.tableEnv))
select(withResolvedAggFunctionCall: _*)
}
-
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
index f3ef039..3de0175 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -174,9 +174,10 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
ValidationSuccess
case WindowReference(_, _) =>
ValidationFailure("Reference to a rowtime or proctime window required.")
- case _ =>
+ case any =>
ValidationFailure(
- "The '.rowtime' expression can only be used for table definitions and windows.")
+ s"The '.rowtime' expression can only be used for table definitions and windows, " +
+ s"while [$any] was found.")
}
}
@@ -189,8 +190,7 @@ case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
// batch time window
Types.SQL_TIMESTAMP
case _ =>
- throw TableException("WindowReference of RowtimeAttribute has invalid type. " +
- "Please report this bug.")
+ throw TableException("RowtimeAttribute has invalid type. Please report this bug.")
}
}
@@ -208,9 +208,10 @@ case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) {
ValidationSuccess
case WindowReference(_, _) =>
ValidationFailure("Reference to a rowtime or proctime window required.")
- case _ =>
+ case any =>
ValidationFailure(
- "The '.proctime' expression can only be used for table definitions and windows.")
+ "The '.proctime' expression can only be used for table definitions and windows, " +
+ s"while [$any] was found.")
}
}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TemporalTableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TemporalTableFunction.scala
new file mode 100644
index 0000000..7943144
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TemporalTableFunction.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.types.Row
+
+/**
+ * Class representing temporal table function over some history table.
+ * It takes one single argument, the `timeAttribute`, for which it returns matching version of
+ * the `underlyingHistoryTable`, from which this [[TemporalTableFunction]] was created.
+ *
+ * This function shouldn't be evaluated. Instead calls to it should be rewritten by the optimiser
+ * into other operators (like Temporal Table Join).
+ */
+class TemporalTableFunction private(
+ @transient private val underlyingHistoryTable: Table,
+ private val timeAttribute: Expression,
+ private val primaryKey: String,
+ private val resultType: RowTypeInfo)
+ extends TableFunction[Row] {
+
+ def eval(row: Timestamp): Unit = {
+ throw new IllegalStateException("This should never be called")
+ }
+
+ override def getResultType: RowTypeInfo = {
+ resultType
+ }
+
+ def getTimeAttribute: Expression = {
+ timeAttribute
+ }
+
+ def getPrimaryKey: String = {
+ primaryKey
+ }
+
+ private[flink] def getUnderlyingHistoryTable: Table = {
+ if (underlyingHistoryTable == null) {
+ throw new IllegalStateException("Accessing table field after planing/serialization")
+ }
+ underlyingHistoryTable
+ }
+}
+
+object TemporalTableFunction {
+ def create(
+ table: Table,
+ timeAttribute: Expression,
+ primaryKey: String): TemporalTableFunction = {
+ new TemporalTableFunction(
+ table,
+ timeAttribute,
+ primaryKey,
+ new RowTypeInfo(
+ table.getSchema.getTypes,
+ table.getSchema.getColumnNames))
+ }
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 7579621..84e3f79 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -686,6 +686,20 @@ case class WindowAggregate(
}
}
+case class TemporalTable(
+ timeAttribute: Expression,
+ primaryKey: Expression,
+ child: LogicalNode)
+ extends UnaryNode {
+
+ override def output: Seq[Attribute] = child.output
+
+ override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
+ throw new UnsupportedOperationException(
+ "This should never be called. This node is supposed to be used only for validation")
+ }
+}
+
/**
* LogicalNode for calling a user-defined table functions.
*
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
index 4bf1ca1..0ed2301 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
@@ -21,14 +21,16 @@ package org.apache.flink.table.plan.nodes.logical
import java.lang.reflect.Type
import java.util.{List => JList, Set => JSet}
-import org.apache.calcite.plan.{Convention, RelOptCluster, RelTraitSet}
+import org.apache.calcite.plan.{Convention, RelOptCluster, RelOptRuleCall, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rel.convert.ConverterRule
-import org.apache.calcite.rel.core.{TableFunctionScan, TableScan}
-import org.apache.calcite.rel.logical.{LogicalTableFunctionScan, LogicalTableScan}
+import org.apache.calcite.rel.core.TableFunctionScan
+import org.apache.calcite.rel.logical.LogicalTableFunctionScan
import org.apache.calcite.rel.metadata.RelColumnMapping
-import org.apache.calcite.rex.RexNode
+import org.apache.calcite.rex.{RexCall, RexNode, RexVisitorImpl}
+import org.apache.flink.table.functions.TemporalTableFunction
+import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.plan.nodes.FlinkConventions
class FlinkLogicalTableFunctionScan(
@@ -74,6 +76,32 @@ class FlinkLogicalTableFunctionScanConverter
FlinkConventions.LOGICAL,
"FlinkLogicalTableFunctionScanConverter") {
+ /**
+ * This rule do not match to [[TemporalTableFunction]]. We do not support reading from
+ * [[TemporalTableFunction]]s as TableFunctions. We expect them to be rewritten into
+ * [[org.apache.flink.table.plan.nodes.datastream.DataStreamScan]] followed by for
+ * example [[org.apache.flink.table.plan.nodes.datastream.DataStreamTemporalTableJoin]].
+ */
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val logicalTableFunction: LogicalTableFunctionScan = call.rel(0)
+
+ !isTemporalTableFunctionCall(logicalTableFunction)
+ }
+
+ private def isTemporalTableFunctionCall(logicalTableFunction: LogicalTableFunctionScan)
+ : Boolean = {
+
+ if (!logicalTableFunction.getCall.isInstanceOf[RexCall]) {
+ return false
+ }
+ val rexCall = logicalTableFunction.getCall().asInstanceOf[RexCall]
+ if (!rexCall.getOperator.isInstanceOf[TableSqlFunction]) {
+ return false
+ }
+ val tableFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
+ tableFunction.getTableFunction.isInstanceOf[TemporalTableFunction]
+ }
+
def convert(rel: RelNode): RelNode = {
val scan = rel.asInstanceOf[LogicalTableFunctionScan]
val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
new file mode 100644
index 0000000..0942dd3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.stream.table
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.{TableSchema, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.ResolvedFieldReference
+import org.apache.flink.table.functions.TemporalTableFunction
+import org.apache.flink.table.utils._
+import org.junit.Assert.{assertArrayEquals, assertEquals, assertTrue}
+import org.junit.Test
+
+class TemporalTableJoinTest extends TableTestBase {
+
+ val util: TableTestUtil = streamTestUtil()
+
+ val ratesHistory = util.addTable[(String, Int, Timestamp)](
+ "RatesHistory", 'currency, 'rate, 'rowtime.rowtime)
+
+ val rates = util.addFunction(
+ "Rates",
+ ratesHistory.createTemporalTableFunction('rowtime, 'currency))
+
+ @Test
+ def testTemporalTableFunctionScan(): Unit = {
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage(
+ "Cannot translate a query with an unbounded table function call")
+
+ val result = rates(java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123"))
+ util.printTable(result)
+ }
+
+ @Test
+ def testProcessingTimeIndicatorVersion(): Unit = {
+ val util: TableTestUtil = streamTestUtil()
+ val ratesHistory = util.addTable[(String, Int)](
+ "RatesHistory", 'currency, 'rate, 'proctime.proctime)
+ val rates = ratesHistory.createTemporalTableFunction('proctime, 'currency)
+ assertRatesFunction(ratesHistory.getSchema, rates, true)
+ }
+
+ @Test
+ def testValidStringFieldReference(): Unit = {
+ val rates = ratesHistory.createTemporalTableFunction("rowtime", "currency")
+ assertRatesFunction(ratesHistory.getSchema, rates)
+ }
+
+ private def assertRatesFunction(
+ expectedSchema: TableSchema,
+ rates: TemporalTableFunction,
+ proctime: Boolean = false): Unit = {
+ assertEquals("currency", rates.getPrimaryKey)
+ assertTrue(rates.getTimeAttribute.isInstanceOf[ResolvedFieldReference])
+ assertEquals(
+ if (proctime) "proctime" else "rowtime",
+ rates.getTimeAttribute.asInstanceOf[ResolvedFieldReference].name)
+ assertArrayEquals(
+ expectedSchema.getColumnNames.asInstanceOf[Array[Object]],
+ rates.getResultType.getFieldNames.asInstanceOf[Array[Object]])
+ assertArrayEquals(
+ expectedSchema.getTypes.asInstanceOf[Array[Object]],
+ rates.getResultType.getFieldTypes.asInstanceOf[Array[Object]])
+ }
+}
+
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
new file mode 100644
index 0000000..71b1585
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/TemporalTableJoinValidationTest.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.table.validation
+
+import java.sql.Timestamp
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils._
+import org.junit.Test
+
+class TemporalTableJoinValidationTest extends TableTestBase {
+
+ val util: TableTestUtil = streamTestUtil()
+
+ val orders = util.addTable[(Long, String, Timestamp)](
+ "Orders", 'o_amount, 'o_currency, 'o_rowtime.rowtime)
+
+ val ratesHistory = util.addTable[(String, Int, Timestamp)](
+ "RatesHistory", 'currency, 'rate, 'rowtime.rowtime)
+
+ @Test
+ def testInvalidFieldReference(): Unit = {
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage("Cannot resolve field [foobar]")
+
+ ratesHistory.createTemporalTableFunction('rowtime, 'foobar)
+ }
+
+ @Test
+ def testInvalidStringFieldReference(): Unit = {
+ expectedException.expect(classOf[ValidationException])
+ expectedException.expectMessage("Cannot resolve field [foobar]")
+
+ ratesHistory.createTemporalTableFunction("rowtime", "foobar")
+ }
+}
+
+