You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 20:04:57 UTC
flink git commit: [FLINK-6334] [table] Refactor Table API
TableFunction join methods.
Repository: flink
Updated Branches:
refs/heads/master 8ed85fe49 -> c969237fc
[FLINK-6334] [table] Refactor Table API TableFunction join methods.
This closes #3791.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c969237f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c969237f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c969237f
Branch: refs/heads/master
Commit: c969237fce4fe5394e1cfdbb1186db63333d73d0
Parents: 8ed85fe
Author: Xpray <le...@gmail.com>
Authored: Wed May 3 16:52:43 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Wed May 3 21:51:21 2017 +0200
----------------------------------------------------------------------
.../flink/table/api/TableEnvironment.scala | 2 +-
.../api/scala/TableFunctionConversions.scala | 56 ++++
.../apache/flink/table/api/scala/package.scala | 4 +
.../org/apache/flink/table/api/table.scala | 288 +++++++++----------
.../flink/table/functions/TableFunction.scala | 16 --
.../utils/UserDefinedFunctionUtils.scala | 39 ++-
.../flink/table/plan/logical/operators.scala | 22 +-
.../api/scala/batch/table/JoinITCase.scala | 25 +-
.../table/UserDefinedTableFunctionTest.scala | 18 +-
.../table/UserDefinedTableFunctionTest.scala | 126 +++++++-
10 files changed, 404 insertions(+), 192 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index bd974b0..45267d2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -79,7 +79,7 @@ abstract class TableEnvironment(val config: TableConfig) {
private val rootSchema: SchemaPlus = internalSchema.plus()
// Table API/SQL function catalog
- private val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
+ private[flink] val functionCatalog: FunctionCatalog = FunctionCatalog.withBuiltIns
// the configuration to create a Calcite planner
private lazy val frameworkConfig: FrameworkConfig = Frameworks
http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
new file mode 100644
index 0000000..692876f
--- /dev/null
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/TableFunctionConversions.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Table
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.functions.TableFunction
+import org.apache.flink.table.plan.logical.LogicalTableFunctionCall
+
+/**
+ * Holds methods to convert a [[TableFunction]] call in the Scala Table API into a [[Table]].
+ *
+ * @param tf The TableFunction to convert.
+ */
+class TableFunctionConversions[T](tf: TableFunction[T]) {
+
+ /**
+ * Creates a [[Table]] from a [[TableFunction]] in Scala Table API.
+ *
+ * @param args The arguments of the table function call.
+ * @return A [[Table]] with which represents the [[LogicalTableFunctionCall]].
+ */
+ final def apply(args: Expression*)(implicit typeInfo: TypeInformation[T]): Table = {
+
+ val resultType = if (tf.getResultType == null) typeInfo else tf.getResultType
+
+ new Table(
+ tableEnv = null, // Table environment will be set later.
+ LogicalTableFunctionCall(
+ tf.getClass.getCanonicalName,
+ tf,
+ args.toList,
+ resultType,
+ Array.empty,
+ child = null // Child will be set later.
+ )
+ )
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
index 5b431ec..e8a2017 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/package.scala
@@ -24,6 +24,7 @@ import org.apache.flink.api.scala.DataSet
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.table.api.scala.{StreamTableEnvironment => ScalaStreamTableEnv}
import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTableEnv}
+import org.apache.flink.table.functions.TableFunction
import _root_.scala.language.implicitConversions
@@ -89,4 +90,7 @@ package object scala extends ImplicitExpressionConversions {
tableEnv.toDataStream[Row](table)
}
+ implicit def tableFunctionCall2Table[T](tf: TableFunction[T]): TableFunctionConversions[T] = {
+ new TableFunctionConversions[T](tf)
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
index 0953611..9606979 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala
@@ -20,9 +20,10 @@ package org.apache.flink.table.api
import org.apache.calcite.rel.RelNode
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.operators.join.JoinType
-import org.apache.flink.table.calcite.FlinkTypeFactory
+import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
+import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionParser, Ordering, UnresolvedAlias, UnresolvedFieldReference}
import org.apache.flink.table.plan.logical.Minus
-import org.apache.flink.table.expressions.{Alias, Asc, Call, Expression, ExpressionParser, Ordering, TableFunctionCall, UnresolvedAlias}
+import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.plan.ProjectionTranslator._
import org.apache.flink.table.plan.logical._
import org.apache.flink.table.sinks.TableSink
@@ -64,13 +65,34 @@ class Table(
private[flink] val tableEnv: TableEnvironment,
private[flink] val logicalPlan: LogicalNode) {
- private val tableSchema: TableSchema = new TableSchema(
+ // Check if the plan has an unbounded TableFunctionCall as child node.
+ // A TableFunctionCall is tolerated as root node because the Table holds the initial call.
+ if (containsUnboudedUDTFCall(logicalPlan) &&
+ !logicalPlan.isInstanceOf[LogicalTableFunctionCall]) {
+ throw new ValidationException("TableFunction can only be used in join and leftOuterJoin.")
+ }
+
+ /**
+ * Creates a [[Table]] for a TableFunction call from a String expression.
+ *
+ * @param tableEnv The TableEnvironment in which the call is created.
+ * @param udtfCall A String expression of the TableFunction call.
+ */
+ def this(tableEnv: TableEnvironment, udtfCall: String) {
+ this(tableEnv, UserDefinedFunctionUtils.createLogicalFunctionCall(tableEnv, udtfCall))
+ }
+
+ private lazy val tableSchema: TableSchema = new TableSchema(
logicalPlan.output.map(_.name).toArray,
logicalPlan.output.map(_.resultType).toArray)
- def relBuilder = tableEnv.getRelBuilder
+ def relBuilder: FlinkRelBuilder = tableEnv.getRelBuilder
- def getRelNode: RelNode = logicalPlan.toRelNode(relBuilder)
+ def getRelNode: RelNode = if (containsUnboudedUDTFCall(logicalPlan)) {
+ throw new ValidationException("Cannot translate a query with an unbounded table function call.")
+ } else {
+ logicalPlan.toRelNode(relBuilder)
+ }
/**
* Returns the schema of this table.
@@ -143,7 +165,35 @@ class Table(
* }}}
*/
def as(fields: Expression*): Table = {
- new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv))
+
+ logicalPlan match {
+ case functionCall: LogicalTableFunctionCall if functionCall.child == null => {
+ // If the logical plan is a TableFunctionCall, we replace its field names to avoid special
+ // cases during the validation.
+ if (fields.length != functionCall.output.length) {
+ throw new ValidationException(
+ "List of column aliases must have same degree as TableFunction's output")
+ }
+ if (!fields.forall(_.isInstanceOf[UnresolvedFieldReference])) {
+ throw new ValidationException(
+ "Alias field must be an instance of UnresolvedFieldReference"
+ )
+ }
+ new Table(
+ tableEnv,
+ new LogicalTableFunctionCall(
+ functionCall.functionName,
+ functionCall.tableFunction,
+ functionCall.parameters,
+ functionCall.resultType,
+ fields.map(_.asInstanceOf[UnresolvedFieldReference].name).toArray,
+ functionCall.child)
+ )
+ }
+ case _ =>
+ // prepend an AliasNode
+ new Table(tableEnv, AliasNode(fields, logicalPlan).validate(tableEnv))
+ }
}
/**
@@ -310,6 +360,41 @@ class Table(
}
/**
+ * Joins this [[Table]] with an user-defined [[org.apache.calcite.schema.TableFunction]].
+ * This join is similar to a SQL left outer join with ON TRUE predicate, but it works with a
+ * table function. Each row of the outer table is joined with all rows produced by the table
+ * function. If the table function does not produce any row, the outer row is padded with nulls.
+ *
+ * Scala Example:
+ * {{{
+ * class MySplitUDTF extends TableFunction[String] {
+ * def eval(str: String): Unit = {
+ * str.split("#").foreach(collect)
+ * }
+ * }
+ *
+ * val split = new MySplitUDTF()
+ * table.leftOuterJoin(split('c) as ('s)).select('a,'b,'c,'s)
+ * }}}
+ *
+ * Java Example:
+ * {{{
+ * class MySplitUDTF extends TableFunction<String> {
+ * public void eval(String str) {
+ * str.split("#").forEach(this::collect);
+ * }
+ * }
+ *
+ * TableFunction<String> split = new MySplitUDTF();
+ * tableEnv.registerFunction("split", split);
+ * table.leftOuterJoin(new Table(tableEnv, "split(c)").as("s"))).select("a, b, c, s");
+ * }}}
+ */
+ def leftOuterJoin(right: Table): Table = {
+ join(right, None, JoinType.LEFT_OUTER)
+ }
+
+ /**
* Joins two [[Table]]s. Similar to an SQL left outer join. The fields of the two joined
* operations must not overlap, use [[as]] to rename fields if necessary.
*
@@ -417,14 +502,46 @@ class Table(
}
private def join(right: Table, joinPredicate: Option[Expression], joinType: JoinType): Table = {
- // check that right table belongs to the same TableEnvironment
- if (right.tableEnv != this.tableEnv) {
- throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
+
+ // check if we join with a table or a table function
+ if (!containsUnboudedUDTFCall(right.logicalPlan)) {
+ // regular table-table join
+
+ // check that the TableEnvironment of right table is not null
+ // and right table belongs to the same TableEnvironment
+ if (right.tableEnv != this.tableEnv) {
+ throw new ValidationException("Only tables from the same TableEnvironment can be joined.")
+ }
+
+ new Table(
+ tableEnv,
+ Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false)
+ .validate(tableEnv))
+
+ } else {
+ // join with a table function
+
+ // check join type
+ if (joinType != JoinType.INNER && joinType != JoinType.LEFT_OUTER) {
+ throw new ValidationException(
+ "TableFunctions are currently supported for join and leftOuterJoin.")
+ }
+
+ val udtf = right.logicalPlan.asInstanceOf[LogicalTableFunctionCall]
+ val udtfCall = LogicalTableFunctionCall(
+ udtf.functionName,
+ udtf.tableFunction,
+ udtf.parameters,
+ udtf.resultType,
+ udtf.fieldNames,
+ this.logicalPlan
+ ).validate(tableEnv)
+
+ new Table(
+ tableEnv,
+ Join(this.logicalPlan, udtfCall, joinType, joinPredicate, correlated = true)
+ .validate(tableEnv))
}
- new Table(
- tableEnv,
- Join(this.logicalPlan, right.logicalPlan, joinType, joinPredicate, correlated = false)
- .validate(tableEnv))
}
/**
@@ -633,136 +750,6 @@ class Table(
}
/**
- * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
- * to an SQL cross join, but it works with a table function. It returns rows from the outer
- * table (table on the left of the operator) that produces matching values from the table
- * function (which is defined in the expression on the right side of the operator).
- *
- * Example:
- *
- * {{{
- * class MySplitUDTF extends TableFunction[String] {
- * def eval(str: String): Unit = {
- * str.split("#").foreach(collect)
- * }
- * }
- *
- * val split = new MySplitUDTF()
- * table.join(split('c) as ('s)).select('a,'b,'c,'s)
- * }}}
- */
- def join(udtf: Expression): Table = {
- joinUdtfInternal(udtf, JoinType.INNER)
- }
-
- /**
- * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
- * to an SQL cross join, but it works with a table function. It returns rows from the outer
- * table (table on the left of the operator) that produces matching values from the table
- * function (which is defined in the expression on the right side of the operator).
- *
- * Example:
- *
- * {{{
- * class MySplitUDTF extends TableFunction<String> {
- * public void eval(String str) {
- * str.split("#").forEach(this::collect);
- * }
- * }
- *
- * TableFunction<String> split = new MySplitUDTF();
- * tableEnv.registerFunction("split", split);
- *
- * table.join("split(c) as (s)").select("a, b, c, s");
- * }}}
- */
- def join(udtf: String): Table = {
- joinUdtfInternal(udtf, JoinType.INNER)
- }
-
- /**
- * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
- * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
- * the rows from the outer table (table on the left of the operator), and rows that do not match
- * the condition from the table function (which is defined in the expression on the right
- * side of the operator). Rows with no matching condition are filled with null values.
- *
- * Example:
- *
- * {{{
- * class MySplitUDTF extends TableFunction[String] {
- * def eval(str: String): Unit = {
- * str.split("#").foreach(collect)
- * }
- * }
- *
- * val split = new MySplitUDTF()
- * table.leftOuterJoin(split('c) as ('s)).select('a,'b,'c,'s)
- * }}}
- */
- def leftOuterJoin(udtf: Expression): Table = {
- joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
- }
-
- /**
- * Joins this [[Table]] to a user-defined [[org.apache.calcite.schema.TableFunction]]. Similar
- * to an SQL left outer join with ON TRUE, but it works with a table function. It returns all
- * the rows from the outer table (table on the left of the operator), and rows that do not match
- * the condition from the table function (which is defined in the expression on the right
- * side of the operator). Rows with no matching condition are filled with null values.
- *
- * Example:
- *
- * {{{
- * class MySplitUDTF extends TableFunction<String> {
- * public void eval(String str) {
- * str.split("#").forEach(this::collect);
- * }
- * }
- *
- * TableFunction<String> split = new MySplitUDTF();
- * tableEnv.registerFunction("split", split);
- *
- * table.leftOuterJoin("split(c) as (s)").select("a, b, c, s");
- * }}}
- */
- def leftOuterJoin(udtf: String): Table = {
- joinUdtfInternal(udtf, JoinType.LEFT_OUTER)
- }
-
- private def joinUdtfInternal(udtfString: String, joinType: JoinType): Table = {
- val udtf = ExpressionParser.parseExpression(udtfString)
- joinUdtfInternal(udtf, joinType)
- }
-
- private def joinUdtfInternal(udtf: Expression, joinType: JoinType): Table = {
- var alias: Option[Seq[String]] = None
-
- // unwrap an Expression until we get a TableFunctionCall
- def unwrap(expr: Expression): TableFunctionCall = expr match {
- case Alias(child, name, extraNames) =>
- alias = Some(Seq(name) ++ extraNames)
- unwrap(child)
- case Call(name, args) =>
- val function = tableEnv.getFunctionCatalog.lookupFunction(name, args)
- unwrap(function)
- case c: TableFunctionCall => c
- case _ =>
- throw new TableException(
- "Cross/Outer Apply operators only accept expressions that define table functions.")
- }
-
- val call = unwrap(udtf)
- .as(alias)
- .toLogicalTableFunctionCall(this.logicalPlan)
- .validate(tableEnv)
-
- new Table(
- tableEnv,
- Join(this.logicalPlan, call, joinType, None, correlated = true).validate(tableEnv))
- }
-
- /**
* Writes the [[Table]] to a [[TableSink]]. A [[TableSink]] defines an external storage location.
*
* A batch [[Table]] can only be written to a
@@ -773,7 +760,6 @@ class Table(
* @tparam T The data type that the [[TableSink]] expects.
*/
def writeToSink[T](sink: TableSink[T]): Unit = {
-
// get schema information of table
val rowType = getRelNode.getRowType
val fieldNames: Array[String] = rowType.getFieldNames.asScala.toArray
@@ -859,6 +845,20 @@ class Table(
}
tableName
}
+
+ /**
+ * Checks if the plan represented by a [[LogicalNode]] contains an unbounded UDTF call.
+ * @param n the node to check
+ * @return true if the plan contains an unbounded UDTF call, false otherwise.
+ */
+ private def containsUnboudedUDTFCall(n: LogicalNode): Boolean = {
+ n match {
+ case functionCall: LogicalTableFunctionCall if functionCall.child == null => true
+ case u: UnaryNode => containsUnboudedUDTFCall(u.child)
+ case b: BinaryNode => containsUnboudedUDTFCall(b.left) || containsUnboudedUDTFCall(b.right)
+ case _: LeafNode => false
+ }
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
index d4c5021..5354349 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala
@@ -19,7 +19,6 @@
package org.apache.flink.table.functions
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.expressions.{Expression, TableFunctionCall}
import org.apache.flink.util.Collector
/**
@@ -79,21 +78,6 @@ import org.apache.flink.util.Collector
*/
abstract class TableFunction[T] extends UserDefinedFunction {
- /**
- * Creates a call to a [[TableFunction]] in Scala Table API.
- *
- * @param params actual parameters of function
- * @return [[Expression]] in form of a [[TableFunctionCall]]
- */
- final def apply(params: Expression*)(implicit typeInfo: TypeInformation[T]): Expression = {
- val resultType = if (getResultType == null) {
- typeInfo
- } else {
- getResultType
- }
- TableFunctionCall(getClass.getSimpleName, this, params, resultType)
- }
-
override def toString: String = getClass.getCanonicalName
// ----------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index c1cfe06..d108e31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -19,7 +19,7 @@
package org.apache.flink.table.functions.utils
-import java.lang.{Long => JLong, Integer => JInt}
+import java.lang.{Integer => JInt, Long => JLong}
import java.lang.reflect.{Method, Modifier}
import java.sql.{Date, Time, Timestamp}
@@ -30,8 +30,10 @@ import org.apache.flink.api.common.functions.InvalidTypesException
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.{ScalarFunction, TableFunction, UserDefinedFunction}
+import org.apache.flink.table.plan.logical._
import org.apache.flink.table.plan.schema.FlinkTableFunctionImpl
import org.apache.flink.util.InstantiationUtil
@@ -358,4 +360,37 @@ object UserDefinedFunctionUtils {
InstantiationUtil
.deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader)
}
+
+ /**
+ * Creates a [[LogicalTableFunctionCall]] by parsing a String expression.
+ *
+ * @param tableEnv The table environmenent to lookup the function.
+ * @param udtf a String expression of a TableFunctionCall, such as "split(c)"
+ * @return A LogicalTableFunctionCall.
+ */
+ def createLogicalFunctionCall(
+ tableEnv: TableEnvironment,
+ udtf: String): LogicalTableFunctionCall = {
+
+ var alias: Option[Seq[String]] = None
+
+ // unwrap an Expression until we get a TableFunctionCall
+ def unwrap(expr: Expression): TableFunctionCall = expr match {
+ case Alias(child, name, extraNames) =>
+ alias = Some(Seq(name) ++ extraNames)
+ unwrap(child)
+ case Call(name, args) =>
+ val function = tableEnv.functionCatalog.lookupFunction(name, args)
+ unwrap(function)
+ case c: TableFunctionCall => c
+ case _ =>
+ throw new TableException(
+ "Table(TableEnv, String) constructor only accept String that " +
+ "define table function followed by some Alias.")
+ }
+
+ val functionCall: LogicalTableFunctionCall = unwrap(ExpressionParser.parseExpression(udtf))
+ .as(alias).toLogicalTableFunctionCall(child = null)
+ functionCall
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index dbfd3ce..c67bfd1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -170,7 +170,7 @@ case class Limit(offset: Int, fetch: Int = -1, child: LogicalNode) extends Unary
if (tableEnv.isInstanceOf[StreamTableEnvironment]) {
failValidation(s"Limit on stream tables is currently not supported.")
}
- if (!child.validate(tableEnv).isInstanceOf[Sort]) {
+ if (!child.isInstanceOf[Sort]) {
failValidation(s"Limit operator must be preceded by an OrderBy operator.")
}
if (offset < 0) {
@@ -662,11 +662,19 @@ case class LogicalTableFunctionCall(
child: LogicalNode)
extends UnaryNode {
- private val (_, fieldIndexes, fieldTypes) = getFieldInfo(resultType)
+ private val (generatedNames, fieldIndexes, fieldTypes) = getFieldInfo(resultType)
private var evalMethod: Method = _
- override def output: Seq[Attribute] = fieldNames.zip(fieldTypes).map {
- case (n, t) => ResolvedFieldReference(n, t)
+ override def output: Seq[Attribute] = {
+ if (fieldNames.isEmpty) {
+ generatedNames.zip(fieldTypes).map {
+ case (n, t) => ResolvedFieldReference(n, t)
+ }
+ } else {
+ fieldNames.zip(fieldTypes).map {
+ case (n, t) => ResolvedFieldReference(n, t)
+ }
+ }
}
override def validate(tableEnv: TableEnvironment): LogicalNode = {
@@ -691,7 +699,11 @@ case class LogicalTableFunctionCall(
override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
val fieldIndexes = getFieldInfo(resultType)._2
- val function = new FlinkTableFunctionImpl(resultType, fieldIndexes, fieldNames, evalMethod)
+ val function = new FlinkTableFunctionImpl(
+ resultType,
+ fieldIndexes,
+ if (fieldNames.isEmpty) generatedNames else fieldNames, evalMethod
+ )
val typeFactory = relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
val sqlFunction = TableSqlFunction(
tableFunction.functionIdentifier,
http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
index 5993728..8085a3c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
@@ -24,8 +24,9 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
import org.apache.flink.table.api.scala._
import org.apache.flink.api.scala.util.CollectionDataSets
import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.utils.TableFunc2
import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
import org.apache.flink.test.util.TestBaseUtils
import org.junit._
@@ -345,4 +346,26 @@ class JoinITCase(
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+ @Test
+ def testUDTFJoinOnTuples(): Unit = {
+ val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+ val data = List("hi#world", "how#are#you")
+
+ val ds1 = env.fromCollection(data).toTable(tEnv, 'a)
+ val func2 = new TableFunc2
+
+ val joinDs = ds1.join(func2('a) as ('name, 'len))
+
+ val results = joinDs.toDataSet[Row].collect()
+ val expected = Seq(
+ "hi#world,hi,2",
+ "hi#world,world,5",
+ "how#are#you,how,3",
+ "how#are#you,are,3",
+ "how#are#you,you,3").mkString("\n")
+ TestBaseUtils.compareResultAsText(results.asJava, expected)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
index 2dbcccf..4d7f6cb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.types.Row
import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.utils.{PojoTableFunc, TableFunc2, _}
-import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.api.{Table, TableEnvironment, Types}
import org.junit.Test
import org.mockito.Mockito._
@@ -54,24 +54,24 @@ class UserDefinedTableFunctionTest extends TableTestBase {
val func1 = new TableFunc1
javaTableEnv.registerFunction("func1", func1)
var scalaTable = in1.join(func1('c) as 's).select('c, 's)
- var javaTable = in2.join("func1(c).as(s)").select("c, s")
+ var javaTable = in2.join(new Table(javaTableEnv, "func1(c).as(s)")).select("c, s")
verifyTableEquals(scalaTable, javaTable)
// test left outer join
scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
- javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
+ javaTable = in2.leftOuterJoin(new Table(javaTableEnv, "as(func1(c), s)")).select("c, s")
verifyTableEquals(scalaTable, javaTable)
// test overloading
scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
- javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
+ javaTable = in2.join(new Table(javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
verifyTableEquals(scalaTable, javaTable)
// test custom result type
val func2 = new TableFunc2
javaTableEnv.registerFunction("func2", func2)
scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
- javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
+ javaTable = in2.join(new Table(javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
verifyTableEquals(scalaTable, javaTable)
// test hierarchy generic type
@@ -79,7 +79,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
javaTableEnv.registerFunction("hierarchy", hierarchy)
scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
.select('c, 'name, 'len, 'adult)
- javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
+ javaTable = in2.join(new Table(javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
.select("c, name, len, adult")
verifyTableEquals(scalaTable, javaTable)
@@ -88,21 +88,21 @@ class UserDefinedTableFunctionTest extends TableTestBase {
javaTableEnv.registerFunction("pojo", pojo)
scalaTable = in1.join(pojo('c))
.select('c, 'name, 'age)
- javaTable = in2.join("pojo(c)")
+ javaTable = in2.join(new Table(javaTableEnv, "pojo(c)"))
.select("c, name, age")
verifyTableEquals(scalaTable, javaTable)
// test with filter
scalaTable = in1.join(func2('c) as ('name, 'len))
.select('c, 'name, 'len).filter('len > 2)
- javaTable = in2.join("func2(c) as (name, len)")
+ javaTable = in2.join(new Table(javaTableEnv, "func2(c) as (name, len)"))
.select("c, name, len").filter("len > 2")
verifyTableEquals(scalaTable, javaTable)
// test with scalar function
scalaTable = in1.join(func1('c.substring(2)) as 's)
.select('a, 'c, 's)
- javaTable = in2.join("func1(substring(c, 2)) as (s)")
+ javaTable = in2.join(new Table(javaTableEnv, "func1(substring(c, 2)) as (s)"))
.select("a, c, s")
verifyTableEquals(scalaTable, javaTable)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c969237f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
index 56b9fdb..3edfd8c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
@@ -57,24 +57,24 @@ class UserDefinedTableFunctionTest extends TableTestBase {
val func1 = new TableFunc1
javaTableEnv.registerFunction("func1", func1)
var scalaTable = in1.join(func1('c) as 's).select('c, 's)
- var javaTable = in2.join("func1(c).as(s)").select("c, s")
+ var javaTable = in2.join(new Table(javaTableEnv, "func1(c).as(s)")).select("c, s")
verifyTableEquals(scalaTable, javaTable)
// test left outer join
scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
- javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
+ javaTable = in2.leftOuterJoin(new Table(javaTableEnv, "func1(c)").as("s")).select("c, s")
verifyTableEquals(scalaTable, javaTable)
// test overloading
scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
- javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
+ javaTable = in2.join(new Table(javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
verifyTableEquals(scalaTable, javaTable)
// test custom result type
val func2 = new TableFunc2
javaTableEnv.registerFunction("func2", func2)
scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
- javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
+ javaTable = in2.join(new Table(javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
verifyTableEquals(scalaTable, javaTable)
// test hierarchy generic type
@@ -82,7 +82,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
javaTableEnv.registerFunction("hierarchy", hierarchy)
scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
.select('c, 'name, 'len, 'adult)
- javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
+ javaTable = in2.join(new Table(javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
.select("c, name, len, adult")
verifyTableEquals(scalaTable, javaTable)
@@ -91,21 +91,21 @@ class UserDefinedTableFunctionTest extends TableTestBase {
javaTableEnv.registerFunction("pojo", pojo)
scalaTable = in1.join(pojo('c))
.select('c, 'name, 'age)
- javaTable = in2.join("pojo(c)")
+ javaTable = in2.join(new Table(javaTableEnv, "pojo(c)"))
.select("c, name, age")
verifyTableEquals(scalaTable, javaTable)
// test with filter
scalaTable = in1.join(func2('c) as ('name, 'len))
.select('c, 'name, 'len).filter('len > 2)
- javaTable = in2.join("func2(c) as (name, len)")
+ javaTable = in2.join(new Table(javaTableEnv, "func2(c) as (name, len)"))
.select("c, name, len").filter("len > 2")
verifyTableEquals(scalaTable, javaTable)
// test with scalar function
scalaTable = in1.join(func1('c.substring(2)) as 's)
.select('a, 'c, 's)
- javaTable = in2.join("func1(substring(c, 2)) as (s)")
+ javaTable = in2.join(new Table(javaTableEnv, "func1(substring(c, 2)) as (s)"))
.select("a, c, s")
verifyTableEquals(scalaTable, javaTable)
@@ -114,8 +114,103 @@ class UserDefinedTableFunctionTest extends TableTestBase {
tableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
expectExceptionThrown(
javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
+ expectExceptionThrown(in1.join(ObjectTableFunction('a, 1)), "Scala object")
+
+ }
+
+ @Test
+ def testInvalidTableFunctions(): Unit = {
+ // mock
+ val ds = mock(classOf[DataStream[Row]])
+ val jDs = mock(classOf[JDataStream[Row]])
+ val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
+ when(ds.javaStream).thenReturn(jDs)
+ when(jDs.getType).thenReturn(typeInfo)
+
+ // Scala environment
+ val env = mock(classOf[ScalaExecutionEnv])
+ val tableEnv = TableEnvironment.getTableEnvironment(env)
+ val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
+
+ // Java environment
+ val javaEnv = mock(classOf[JavaExecutionEnv])
+ val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
+ val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
+
+ val func1 = new TableFunc1
+ javaTableEnv.registerFunction("func1", func1)
+
+ // table function call select
+ expectExceptionThrown(
+ func1('c).select("f0"),
+ "TableFunction can only be used in join and leftOuterJoin."
+ )
+
+ // table function call select
+ expectExceptionThrown(
+ func1('c).select('f0),
+ "TableFunction can only be used in join and leftOuterJoin."
+ )
+
+ // table function call writeToSink
expectExceptionThrown(
- in1.join(ObjectTableFunction('a, 1)), "Scala object")
+ func1('c).writeToSink(null),
+ "Cannot translate a query with an unbounded table function call."
+ )
+
+ // table function call distinct
+ expectExceptionThrown(
+ func1('c).distinct(),
+ "TableFunction can only be used in join and leftOuterJoin."
+ )
+
+ // table function call filter
+ expectExceptionThrown(
+ func1('c).filter('f0 === "?"),
+ "TableFunction can only be used in join and leftOuterJoin."
+ )
+
+ // table function call filter
+ expectExceptionThrown(
+ func1('c).filter("f0 = '?'"),
+ "TableFunction can only be used in join and leftOuterJoin."
+ )
+
+ // table function call limit
+ expectExceptionThrown(
+ func1('c).orderBy('f0).limit(3),
+ "TableFunction can only be used in join and leftOuterJoin."
+ )
+
+ // table function call limit
+ expectExceptionThrown(
+ func1('c).orderBy('f0).limit(0, 3),
+ "TableFunction can only be used in join and leftOuterJoin."
+ )
+
+ // table function call orderBy
+ expectExceptionThrown(
+ func1('c).orderBy("f0"),
+ "TableFunction can only be used in join and leftOuterJoin."
+ )
+
+ // table function call orderBy
+ expectExceptionThrown(
+ func1('c).orderBy('f0),
+ "TableFunction can only be used in join and leftOuterJoin."
+ )
+
+ // table function call where
+ expectExceptionThrown(
+ func1('c).where("f0 = '?'"),
+ "TableFunction can only be used in join and leftOuterJoin."
+ )
+
+ // table function call where
+ expectExceptionThrown(
+ func1('c).where('f0 === "?"),
+ "TableFunction can only be used in join and leftOuterJoin."
+ )
}
@@ -137,7 +232,9 @@ class UserDefinedTableFunctionTest extends TableTestBase {
//============ throw exception when table function is not registered =========
// Java Table API call
- expectExceptionThrown(t.join("nonexist(a)"), "Undefined function: NONEXIST")
+ expectExceptionThrown(
+ t.join(new Table(util.tEnv, "nonexist(a)")
+ ), "Undefined function: NONEXIST")
// SQL API call
expectExceptionThrown(
util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
@@ -145,11 +242,12 @@ class UserDefinedTableFunctionTest extends TableTestBase {
//========= throw exception when the called function is a scalar function ====
- util.addFunction("func0", Func0)
+ util.tEnv.registerFunction("func0", Func0)
+
// Java Table API call
expectExceptionThrown(
- t.join("func0(a)"),
- "only accept expressions that define table functions",
+ t.join(new Table(util.tEnv, "func0(a)")),
+ "only accept String that define table function",
classOf[TableException])
// SQL API call
// NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
@@ -162,7 +260,7 @@ class UserDefinedTableFunctionTest extends TableTestBase {
// Java Table API call
util.addFunction("func2", new TableFunc2)
expectExceptionThrown(
- t.join("func2(c, c)"),
+ t.join(new Table(util.tEnv, "func2(c, c)")),
"Given parameters of function 'FUNC2' do not match any signature")
// SQL API call
expectExceptionThrown(