You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/12/07 15:57:24 UTC
[5/5] flink git commit: [FLINK-4469] [table] Minor improvements
[FLINK-4469] [table] Minor improvements
- Fixed typos
- Removed implicit conversion with TableCallBuilder
- Fixed bugs about expression parser alias and static eval methods
- Refactored tests
This closes #2653.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/684defbf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/684defbf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/684defbf
Branch: refs/heads/master
Commit: 684defbf33168e34657bc1a25607adb53be248c5
Parents: e139f59
Author: twalthr <tw...@apache.org>
Authored: Tue Dec 6 17:46:54 2016 +0100
Committer: twalthr <tw...@apache.org>
Committed: Wed Dec 7 16:55:37 2016 +0100
----------------------------------------------------------------------
docs/dev/table_api.md | 22 +
.../api/java/table/BatchTableEnvironment.scala | 3 +-
.../api/java/table/StreamTableEnvironment.scala | 3 +-
.../api/scala/table/BatchTableEnvironment.scala | 6 +-
.../scala/table/TableFunctionCallBuilder.scala | 39 --
.../flink/api/scala/table/expressionDsl.scala | 10 +-
.../flink/api/table/TableEnvironment.scala | 10 +-
.../flink/api/table/codegen/CodeGenerator.scala | 13 +-
.../codegen/calls/TableFunctionCallGen.scala | 3 +-
.../table/expressions/ExpressionParser.scala | 12 +-
.../flink/api/table/expressions/call.scala | 9 +-
.../api/table/functions/ScalarFunction.scala | 7 +-
.../api/table/functions/TableFunction.scala | 31 +-
.../functions/utils/ScalarSqlFunction.scala | 2 +-
.../functions/utils/TableSqlFunction.scala | 15 +-
.../utils/UserDefinedFunctionUtils.scala | 18 +-
.../api/table/plan/ProjectionTranslator.scala | 4 +-
.../api/table/plan/logical/operators.scala | 23 +-
.../api/table/plan/nodes/FlinkCorrelate.scala | 7 +-
.../plan/nodes/dataset/DataSetCorrelate.scala | 2 +-
.../nodes/datastream/DataStreamCorrelate.scala | 3 +-
.../rules/dataSet/DataSetCorrelateRule.scala | 7 +-
.../datastream/DataStreamCorrelateRule.scala | 12 +-
.../org/apache/flink/api/table/table.scala | 56 ++-
.../api/table/validate/FunctionCatalog.scala | 10 +-
.../src/test/resources/log4j-test.properties | 2 +-
.../batch/UserDefinedTableFunctionITCase.scala | 212 ----------
.../batch/UserDefinedTableFunctionTest.scala | 320 ---------------
.../sql/UserDefinedTableFunctionTest.scala | 238 +++++++++++
.../table/UserDefinedTableFunctionTest.scala | 179 +++++++++
.../stream/UserDefinedTableFunctionITCase.scala | 181 ---------
.../stream/UserDefinedTableFunctionTest.scala | 402 -------------------
.../sql/UserDefinedTableFunctionTest.scala | 237 +++++++++++
.../table/UserDefinedTableFunctionTest.scala | 385 ++++++++++++++++++
.../utils/UserDefinedTableFunctions.scala | 116 ------
.../dataset/DataSetCorrelateITCase.scala | 177 ++++++++
.../datastream/DataStreamCorrelateITCase.scala | 90 +++++
.../flink/api/table/utils/TableTestBase.scala | 15 +-
.../table/utils/UserDefinedTableFunctions.scala | 117 ++++++
39 files changed, 1611 insertions(+), 1387 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/docs/dev/table_api.md
----------------------------------------------------------------------
diff --git a/docs/dev/table_api.md b/docs/dev/table_api.md
index 848f9e4..6cf0dee 100644
--- a/docs/dev/table_api.md
+++ b/docs/dev/table_api.md
@@ -1494,6 +1494,17 @@ Both the Table API and SQL come with a set of built-in functions for data transf
<tr>
<td>
{% highlight java %}
+ANY.as(name [, name ]* )
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Specifies a name for an expression i.e. a field. Additional names can be specified if the expression expands to multiple fields.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight java %}
ANY.isNull
{% endhighlight %}
</td>
@@ -2045,6 +2056,17 @@ COMPOSITE.get(INT)
<tr>
<td>
{% highlight scala %}
+ANY.as(name [, name ]* )
+{% endhighlight %}
+ </td>
+ <td>
+ <p>Specifies a name for an expression i.e. a field. Additional names can be specified if the expression expands to multiple fields.</p>
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ {% highlight scala %}
ANY.isNull
{% endhighlight %}
</td>
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
index b353377..3517338 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/BatchTableEnvironment.scala
@@ -168,7 +168,8 @@ class BatchTableEnvironment(
* Registered functions can be referenced in Table API and SQL queries.
*
* @param name The name under which the function is registered.
- * @param tf The TableFunction to register
+ * @param tf The TableFunction to register.
+ * @tparam T The type of the output row.
*/
def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
implicit val typeInfo: TypeInformation[T] = TypeExtractor
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
index 367cb82..83293e3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/java/table/StreamTableEnvironment.scala
@@ -170,7 +170,8 @@ class StreamTableEnvironment(
* Registered functions can be referenced in Table API and SQL queries.
*
* @param name The name under which the function is registered.
- * @param tf The TableFunction to register
+ * @param tf The TableFunction to register.
+ * @tparam T The type of the output row.
*/
def registerFunction[T](name: String, tf: TableFunction[T]): Unit = {
implicit val typeInfo: TypeInformation[T] = TypeExtractor
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
index 36885d2..f4bfe31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/BatchTableEnvironment.scala
@@ -142,13 +142,13 @@ class BatchTableEnvironment(
/**
* Registers a [[TableFunction]] under a unique name in the TableEnvironment's catalog.
- * Registered functions can be referenced in SQL queries.
+ * Registered functions can be referenced in Table API and SQL queries.
*
* @param name The name under which the function is registered.
- * @param tf The TableFunction to register
+ * @param tf The TableFunction to register.
+ * @tparam T The type of the output row.
*/
def registerFunction[T: TypeInformation](name: String, tf: TableFunction[T]): Unit = {
registerTableFunctionInternal(name, tf)
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableFunctionCallBuilder.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableFunctionCallBuilder.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableFunctionCallBuilder.scala
deleted file mode 100644
index 2261b70..0000000
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/TableFunctionCallBuilder.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.table
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.table.expressions.{Expression, TableFunctionCall}
-import org.apache.flink.api.table.functions.TableFunction
-
-case class TableFunctionCallBuilder[T: TypeInformation](udtf: TableFunction[T]) {
- /**
- * Creates a call to a [[TableFunction]] in Scala Table API.
- *
- * @param params actual parameters of function
- * @return [[TableFunctionCall]]
- */
- def apply(params: Expression*): Expression = {
- val resultType = if (udtf.getResultType == null) {
- implicitly[TypeInformation[T]]
- } else {
- udtf.getResultType
- }
- TableFunctionCall(udtf.getClass.getSimpleName, udtf, params, resultType)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
index cc4c68d..175ce2e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/expressionDsl.scala
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.table.expressions.ExpressionUtils.{toMilliInterval, toMonthInterval, toRowInterval}
import org.apache.flink.api.table.expressions.TimeIntervalUnit.TimeIntervalUnit
import org.apache.flink.api.table.expressions._
-import org.apache.flink.api.table.functions.TableFunction
import scala.language.implicitConversions
@@ -98,6 +97,13 @@ trait ImplicitExpressionOperations {
def cast(toType: TypeInformation[_]) = Cast(expr, toType)
+ /**
+ * Specifies a name for an expression i.e. a field.
+ *
+ * @param name name for one field
+ * @param extraNames additional names if the expression expands to multiple fields
+ * @return field with an alias
+ */
def as(name: Symbol, extraNames: Symbol*) = Alias(expr, name.name, extraNames.map(_.name))
def asc = Asc(expr)
@@ -540,8 +546,6 @@ trait ImplicitExpressionConversions {
implicit def sqlDate2Literal(sqlDate: Date): Expression = Literal(sqlDate)
implicit def sqlTime2Literal(sqlTime: Time): Expression = Literal(sqlTime)
implicit def sqlTimestamp2Literal(sqlTimestamp: Timestamp): Expression = Literal(sqlTimestamp)
- implicit def UDTF2TableFunctionCall[T: TypeInformation](udtf: TableFunction[T]):
- TableFunctionCallBuilder[T] = TableFunctionCallBuilder(udtf)
}
// ------------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
index 8cabadb..b6d0e31 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/TableEnvironment.scala
@@ -24,8 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.calcite.config.Lex
import org.apache.calcite.plan.RelOptPlanner
import org.apache.calcite.rel.`type`.RelDataType
-import org.apache.calcite.rex.RexExecutorImpl
-import org.apache.calcite.schema.{SchemaPlus, Schemas}
+import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.schema.impl.AbstractTable
import org.apache.calcite.sql.SqlOperatorTable
import org.apache.calcite.sql.parser.SqlParser
@@ -158,7 +157,7 @@ abstract class TableEnvironment(val config: TableConfig) {
* user-defined functions under this name.
*/
def registerFunction(name: String, function: ScalarFunction): Unit = {
- // check could be instantiated
+ // check if class could be instantiated
checkForInstantiation(function.getClass)
// register in Table API
@@ -174,9 +173,9 @@ abstract class TableEnvironment(val config: TableConfig) {
*/
private[flink] def registerTableFunctionInternal[T: TypeInformation](
name: String, function: TableFunction[T]): Unit = {
- // check not Scala object
+ // check if class not Scala object
checkNotSingleton(function.getClass)
- // check could be instantiated
+ // check if class could be instantiated
checkForInstantiation(function.getClass)
val typeInfo: TypeInformation[_] = if (function.getResultType != null) {
@@ -187,6 +186,7 @@ abstract class TableEnvironment(val config: TableConfig) {
// register in Table API
functionCatalog.registerFunction(name, function.getClass)
+
// register in SQL API
val sqlFunctions = createTableSqlFunctions(name, function, typeInfo, typeFactory)
functionCatalog.registerSqlFunctions(sqlFunctions)
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
index 9e4f569..f7d6863 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenerator.scala
@@ -32,6 +32,7 @@ import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.typeutils.{GenericTypeInfo, PojoTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.GeneratedExpression.{NEVER_NULL, NO_CODE}
import org.apache.flink.api.table.codegen.Indenter.toISC
import org.apache.flink.api.table.codegen.calls.FunctionGenerator
import org.apache.flink.api.table.codegen.calls.ScalarOperators._
@@ -358,10 +359,11 @@ class CodeGenerator(
val input2AccessExprs = input2 match {
case Some(ti) => for (i <- 0 until ti.getArity)
- // use generateFieldAccess instead of generateInputAccess to avoid the generated table
- // function's field access code is put on the top of function body rather than the while loop
+ // use generateFieldAccess instead of generateInputAccess to avoid the generated table
+ // function's field access code is put on the top of function body rather than
+ // the while loop
yield generateFieldAccess(ti, input2Term, i, input2PojoFieldMapping)
- case None => throw new CodeGenException("type information of input2 must not be null")
+ case None => throw new CodeGenException("Type information of input2 must not be null.")
}
(input1AccessExprs, input2AccessExprs)
}
@@ -781,7 +783,7 @@ class CodeGenerator(
}
override def visitCorrelVariable(correlVariable: RexCorrelVariable): GeneratedExpression = {
- GeneratedExpression(input1Term, GeneratedExpression.NEVER_NULL, "", input1)
+ GeneratedExpression(input1Term, NEVER_NULL, NO_CODE, input1)
}
override def visitLocalRef(localRef: RexLocalRef): GeneratedExpression =
@@ -1019,8 +1021,7 @@ class CodeGenerator(
case None =>
val expr = if (nullableInput) {
generateNullableInputFieldAccess(inputType, inputTerm, index, pojoFieldMapping)
- }
- else {
+ } else {
generateFieldAccess(inputType, inputTerm, index, pojoFieldMapping)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
index 27cb43f..37e70e4 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/calls/TableFunctionCallGen.scala
@@ -20,6 +20,7 @@ package org.apache.flink.api.table.codegen.calls
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.table.codegen.CodeGenUtils._
+import org.apache.flink.api.table.codegen.GeneratedExpression.NEVER_NULL
import org.apache.flink.api.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression}
import org.apache.flink.api.table.functions.TableFunction
import org.apache.flink.api.table.functions.utils.UserDefinedFunctionUtils._
@@ -75,7 +76,7 @@ class TableFunctionCallGen(
// has no result
GeneratedExpression(
functionReference,
- GeneratedExpression.NEVER_NULL,
+ NEVER_NULL,
functionCallCode,
returnType)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
index 6cd63ff..a926717 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/ExpressionParser.scala
@@ -204,8 +204,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
}
lazy val suffixAs: PackratParser[Expression] =
- composite ~ "." ~ AS ~ "(" ~ fieldReference ~ ")" ^^ {
- case e ~ _ ~ _ ~ _ ~ target ~ _ => Alias(e, target.name)
+ composite ~ "." ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+ case e ~ _ ~ _ ~ _ ~ target ~ _ => Alias(e, target.head.name, target.tail.map(_.name))
}
lazy val suffixTrim = composite ~ "." ~ TRIM ~ "(" ~ trimMode ~ "," ~ expression ~ ")" ^^ {
@@ -325,8 +325,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
}
lazy val prefixAs: PackratParser[Expression] =
- AS ~ "(" ~ expression ~ "," ~ fieldReference ~ ")" ^^ {
- case _ ~ _ ~ e ~ _ ~ target ~ _ => Alias(e, target.name)
+ AS ~ "(" ~ expression ~ "," ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+ case _ ~ _ ~ e ~ _ ~ target ~ _ => Alias(e, target.head.name, target.tail.map(_.name))
}
lazy val prefixIf: PackratParser[Expression] =
@@ -447,8 +447,8 @@ object ExpressionParser extends JavaTokenParsers with PackratParsers {
lazy val alias: PackratParser[Expression] = logic ~ AS ~ fieldReference ^^ {
case e ~ _ ~ name => Alias(e, name.name)
- } | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
- case e ~ _ ~ _ ~ names ~ _ => Alias(e, names.head.name, names.drop(1).map(_.name))
+ } | logic ~ AS ~ "(" ~ rep1sep(fieldReference, ",") ~ ")" ^^ {
+ case e ~ _ ~ _ ~ names ~ _ => Alias(e, names.head.name, names.tail.map(_.name))
} | logic
lazy val expression: PackratParser[Expression] = alias |
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
index 3e8d8b1..3bb9dac 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/call.scala
@@ -89,10 +89,8 @@ case class ScalarFunctionCall(
ValidationSuccess
}
}
-
}
-
/**
*
* Expression for calling a user-defined table function with actual parameters.
@@ -114,10 +112,10 @@ case class TableFunctionCall(
override private[flink] def children: Seq[Expression] = parameters
/**
- * Assigns an alias for this table function returned fields that the following `select()` clause
+ * Assigns an alias for this table function's returned fields that the following operator
* can refer to.
*
- * @param aliasList alias for this table function returned fields
+ * @param aliasList alias for this table function's returned fields
* @return this table function call
*/
private[flink] def as(aliasList: Option[Seq[String]]): TableFunctionCall = {
@@ -155,4 +153,7 @@ case class TableFunctionCall(
fieldNames,
child)
}
+
+ override def toString =
+ s"${tableFunction.getClass.getCanonicalName}(${parameters.mkString(", ")})"
}
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
index 86d9d66..2e16096 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/ScalarFunction.scala
@@ -18,15 +18,11 @@
package org.apache.flink.api.table.functions
-import java.lang.reflect.{Method, Modifier}
-
-import org.apache.calcite.sql.SqlFunction
import org.apache.flink.api.common.functions.InvalidTypesException
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.api.table.ValidationException
import org.apache.flink.api.table.expressions.{Expression, ScalarFunctionCall}
-import org.apache.flink.api.table.functions.utils.ScalarSqlFunction
-import org.apache.flink.api.table.{FlinkTypeFactory, ValidationException}
/**
* Base class for a user-defined scalar function. A user-defined scalar functions maps zero, one,
@@ -60,6 +56,7 @@ abstract class ScalarFunction extends UserDefinedFunction {
ScalarFunctionCall(this, params)
}
+ override def toString: String = getClass.getCanonicalName
// ----------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
index 98a2921..3a56efb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/TableFunction.scala
@@ -20,18 +20,16 @@ package org.apache.flink.api.table.functions
import java.util
-import org.apache.flink.api.common.functions.InvalidTypesException
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.TypeExtractor
-import org.apache.flink.api.table.ValidationException
+import org.apache.flink.api.table.expressions.{Expression, TableFunctionCall}
/**
* Base class for a user-defined table function (UDTF). A user-defined table functions works on
* zero, one, or multiple scalar values as input and returns multiple rows as output.
*
* The behavior of a [[TableFunction]] can be defined by implementing a custom evaluation
- * method. An evaluation method must be declared publicly and named "eval". Evaluation methods
- * can also be overloaded by implementing multiple methods named "eval".
+ * method. An evaluation method must be declared publicly, not static and named "eval".
+ * Evaluation methods can also be overloaded by implementing multiple methods named "eval".
*
* User-defined functions must have a default constructor and must be instantiable during runtime.
*
@@ -51,14 +49,14 @@ import org.apache.flink.api.table.ValidationException
*
* public class Split extends TableFunction<String> {
*
- * // implement an "eval" method with several parameters you want
+ * // implement an "eval" method with as many parameters as you want
* public void eval(String str) {
* for (String s : str.split(" ")) {
* collect(s); // use collect(...) to emit an output row
* }
* }
*
- * // can overloading eval method here ...
+ * // you can overload the eval method here ...
* }
*
* val tEnv: TableEnvironment = ...
@@ -82,6 +80,25 @@ import org.apache.flink.api.table.ValidationException
*/
abstract class TableFunction[T] extends UserDefinedFunction {
+ /**
+ * Creates a call to a [[TableFunction]] in Scala Table API.
+ *
+ * @param params actual parameters of function
+ * @return [[Expression]] in form of a [[TableFunctionCall]]
+ */
+ final def apply(params: Expression*)(implicit typeInfo: TypeInformation[T]): Expression = {
+ val resultType = if (getResultType == null) {
+ typeInfo
+ } else {
+ getResultType
+ }
+ TableFunctionCall(getClass.getSimpleName, this, params, resultType)
+ }
+
+ override def toString: String = getClass.getCanonicalName
+
+ // ----------------------------------------------------------------------------------------------
+
private val rows: util.ArrayList[T] = new util.ArrayList[T]()
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala
index 0a987aa..7953b25 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/ScalarSqlFunction.scala
@@ -125,6 +125,7 @@ object ScalarSqlFunction {
: SqlOperandTypeChecker = {
val signatures = getSignatures(scalarFunction)
+
/**
* Operand type checker based on [[ScalarFunction]] given information.
*/
@@ -178,5 +179,4 @@ object ScalarSqlFunction {
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
index 6eadfbc..738238d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/TableSqlFunction.scala
@@ -33,7 +33,6 @@ import org.apache.flink.api.table.plan.schema.FlinkTableFunctionImpl
import scala.collection.JavaConverters._
import java.util
-
/**
* Calcite wrapper for user-defined table functions.
*/
@@ -55,31 +54,33 @@ class TableSqlFunction(
functionImpl) {
/**
- * Get the user-defined table function
+ * Get the user-defined table function.
*/
def getTableFunction = udtf
/**
- * Get the returned table type information of the table function
+ * Get the type information of the table returned by the table function.
*/
def getRowTypeInfo = rowTypeInfo
/**
* Get additional mapping information if the returned table type is a POJO
- * (POJO types have no deterministic field order)
+ * (POJO types have no deterministic field order).
*/
def getPojoFieldMapping = functionImpl.fieldIndexes
}
object TableSqlFunction {
+
/**
- * Util function to create a [[TableSqlFunction]]
+ * Util function to create a [[TableSqlFunction]].
+ *
* @param name function name (used by SQL parser)
- * @param udtf user defined table function to be called
+ * @param udtf user-defined table function to be called
* @param rowTypeInfo the row type information generated by the table function
* @param typeFactory type factory for converting Flink's between Calcite's types
- * @param functionImpl calcite table function schema
+ * @param functionImpl Calcite table function schema
* @return [[TableSqlFunction]]
*/
def apply(
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
index 932baeb..4899691 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -141,13 +141,17 @@ object UserDefinedFunctionUtils {
.getDeclaredMethods
.filter { m =>
val modifiers = m.getModifiers
- m.getName == "eval" && Modifier.isPublic(modifiers) && !Modifier.isAbstract(modifiers)
+ m.getName == "eval" &&
+ Modifier.isPublic(modifiers) &&
+ !Modifier.isAbstract(modifiers) &&
+ !(function.isInstanceOf[TableFunction[_]] && Modifier.isStatic(modifiers))
}
if (methods.isEmpty) {
throw new ValidationException(
s"Function class '${function.getClass.getCanonicalName}' does not implement at least " +
- s"one method named 'eval' which is public and not abstract.")
+ s"one method named 'eval' which is public, not abstract and " +
+ s"(in case of table functions) not static.")
} else {
methods
}
@@ -158,7 +162,7 @@ object UserDefinedFunctionUtils {
}
// ----------------------------------------------------------------------------------------------
- // Utilities for sql functions
+ // Utilities for SQL functions
// ----------------------------------------------------------------------------------------------
/**
@@ -255,7 +259,7 @@ object UserDefinedFunctionUtils {
* Field names are automatically extracted for
* [[org.apache.flink.api.common.typeutils.CompositeType]].
*
- * @param inputType The TypeInformation extract the field names and positions from.
+ * @param inputType The TypeInformation to extract the field names and positions from.
* @return A tuple of two arrays holding the field names and corresponding field positions.
*/
def getFieldInfo(inputType: TypeInformation[_])
@@ -265,8 +269,8 @@ object UserDefinedFunctionUtils {
case t: CompositeType[_] => t.getFieldNames
case a: AtomicType[_] => Array("f0")
case tpe =>
- throw new TableException(s"Currently only support CompositeType and AtomicType. " +
- s"Type $tpe lacks explicit field naming")
+ throw new TableException(s"Currently only CompositeType and AtomicType are supported. " +
+ s"Type $tpe lacks explicit field naming")
}
val fieldIndexes = fieldNames.indices.toArray
val fieldTypes: Array[TypeInformation[_]] = fieldNames.map { i =>
@@ -274,7 +278,7 @@ object UserDefinedFunctionUtils {
case t: CompositeType[_] => t.getTypeAt(i).asInstanceOf[TypeInformation[_]]
case a: AtomicType[_] => a.asInstanceOf[TypeInformation[_]]
case tpe =>
- throw new TableException(s"Currently only support CompositeType and AtomicType.")
+ throw new TableException(s"Currently only CompositeType and AtomicType are supported.")
}
}
(fieldNames, fieldIndexes, fieldTypes)
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
index f6ddeef..c093f1a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/ProjectionTranslator.scala
@@ -49,9 +49,7 @@ object ProjectionTranslator {
val replaced = exprs
.map(replaceAggregationsAndProperties(_, tableEnv, aggNames, propNames))
- .map {
- case e: Expression => UnresolvedAlias(e)
- }
+ .map(UnresolvedAlias)
val aggs = aggNames.map( a => Alias(a._1, a._2)).toSeq
val props = propNames.map( p => Alias(p._1, p._2)).toSeq
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
index 4dc2ab7..438698a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/logical/operators.scala
@@ -428,7 +428,6 @@ case class Join(
right.construct(relBuilder)
val corSet = mutable.Set[CorrelationId]()
-
if (correlated) {
corSet += relBuilder.peek().getCluster.createCorrel()
}
@@ -624,9 +623,9 @@ case class WindowAggregate(
}
}
-
/**
* LogicalNode for calling a user-defined table functions.
+ *
* @param functionName function name
* @param tableFunction table function to be called (might be overloaded)
* @param parameters actual parameters
@@ -634,16 +633,16 @@ case class WindowAggregate(
* @param child child logical node
*/
case class LogicalTableFunctionCall(
- functionName: String,
- tableFunction: TableFunction[_],
- parameters: Seq[Expression],
- resultType: TypeInformation[_],
- fieldNames: Array[String],
- child: LogicalNode)
+ functionName: String,
+ tableFunction: TableFunction[_],
+ parameters: Seq[Expression],
+ resultType: TypeInformation[_],
+ fieldNames: Array[String],
+ child: LogicalNode)
extends UnaryNode {
- val (_, fieldIndexes, fieldTypes) = getFieldInfo(resultType)
- var evalMethod: Method = _
+ private val (_, fieldIndexes, fieldTypes) = getFieldInfo(resultType)
+ private var evalMethod: Method = _
override def output: Seq[Attribute] = fieldNames.zip(fieldTypes).map {
case (n, t) => ResolvedFieldReference(n, t)
@@ -651,9 +650,9 @@ case class LogicalTableFunctionCall(
override def validate(tableEnv: TableEnvironment): LogicalNode = {
val node = super.validate(tableEnv).asInstanceOf[LogicalTableFunctionCall]
- // check not Scala object
+ // check if not Scala object
checkNotSingleton(tableFunction.getClass)
- // check could be instantiated
+ // check if class could be instantiated
checkForInstantiation(tableFunction.getClass)
// look for a signature that matches the input types
val signature = node.parameters.map(_.resultType)
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
index 9745be1..93a8f53 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCorrelate.scala
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.table.codegen.{CodeGenerator, GeneratedExpression, GeneratedFunction}
import org.apache.flink.api.table.codegen.CodeGenUtils.primitiveDefaultValue
+import org.apache.flink.api.table.codegen.GeneratedExpression.{ALWAYS_NULL, NO_CODE}
import org.apache.flink.api.table.functions.utils.TableSqlFunction
import org.apache.flink.api.table.runtime.FlatMapRunner
import org.apache.flink.api.table.typeutils.TypeConverter._
@@ -73,12 +74,12 @@ trait FlinkCorrelate {
// outer apply
// in case of outer apply and the returned row of table function is empty,
- // fill null to all fields of the row
+ // fill all fields of row with null
val input2NullExprs = input2AccessExprs.map { x =>
GeneratedExpression(
primitiveDefaultValue(x.resultType),
- GeneratedExpression.ALWAYS_NULL,
- "",
+ ALWAYS_NULL,
+ NO_CODE,
x.resultType)
}
val outerResultExpr = generator.generateResultExpression(
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
index 4aa7fea..3cddf8b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetCorrelate.scala
@@ -99,7 +99,7 @@ class DataSetCorrelate(
config.getNullCheck,
config.getEfficientTypeUsage)
- // do not need to specify input type
+ // we do not need to specify input type
val inputDS = inputNode.asInstanceOf[DataSetRel].translateToPlan(tableEnv)
val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
index b0bc48a..028cb10 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/datastream/DataStreamCorrelate.scala
@@ -48,6 +48,7 @@ class DataStreamCorrelate(
extends SingleRel(cluster, traitSet, inputNode)
with FlinkCorrelate
with DataStreamRel {
+
override def deriveRowType() = relRowType
override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): RelNode = {
@@ -92,7 +93,7 @@ class DataStreamCorrelate(
config.getNullCheck,
config.getEfficientTypeUsage)
- // do not need to specify input type
+ // we do not need to specify input type
val inputDS = inputNode.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
val funcRel = scan.asInstanceOf[LogicalTableFunctionScan]
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
index e6cf0cf..39756be 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataSet/DataSetCorrelateRule.scala
@@ -33,8 +33,7 @@ class DataSetCorrelateRule
classOf[LogicalCorrelate],
Convention.NONE,
DataSetConvention.INSTANCE,
- "DataSetCorrelateRule")
- {
+ "DataSetCorrelateRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
@@ -46,7 +45,9 @@ class DataSetCorrelateRule
case scan: LogicalTableFunctionScan => true
// a filter is pushed above the table function
case filter: LogicalFilter =>
- filter.getInput.asInstanceOf[RelSubset].getOriginal
+ filter
+ .getInput.asInstanceOf[RelSubset]
+ .getOriginal
.isInstanceOf[LogicalTableFunctionScan]
case _ => false
}
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
index bb52fd7..554c6c1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/datastream/DataStreamCorrelateRule.scala
@@ -33,8 +33,7 @@ class DataStreamCorrelateRule
classOf[LogicalCorrelate],
Convention.NONE,
DataStreamConvention.INSTANCE,
- "DataStreamCorrelateRule")
-{
+ "DataStreamCorrelateRule") {
override def matches(call: RelOptRuleCall): Boolean = {
val join: LogicalCorrelate = call.rel(0).asInstanceOf[LogicalCorrelate]
@@ -45,7 +44,9 @@ class DataStreamCorrelateRule
case scan: LogicalTableFunctionScan => true
// a filter is pushed above the table function
case filter: LogicalFilter =>
- filter.getInput.asInstanceOf[RelSubset].getOriginal
+ filter
+ .getInput.asInstanceOf[RelSubset]
+ .getOriginal
.isInstanceOf[LogicalTableFunctionScan]
case _ => false
}
@@ -63,8 +64,9 @@ class DataStreamCorrelateRule
convertToCorrelate(rel.getRelList.get(0), condition)
case filter: LogicalFilter =>
- convertToCorrelate(filter.getInput.asInstanceOf[RelSubset].getOriginal,
- Some(filter.getCondition))
+ convertToCorrelate(
+ filter.getInput.asInstanceOf[RelSubset].getOriginal,
+ Some(filter.getCondition))
case scan: LogicalTableFunctionScan =>
new DataStreamCorrelate(
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
index a75f2fc..b421c8e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/table.scala
@@ -611,9 +611,9 @@ class Table(
}
/**
- * The Cross Apply returns rows from the outer table (table on the left of the Apply operator)
- * that produces matching values from the table-valued function (which is on the right side of
- * the operator).
+ * The Cross Apply operator returns rows from the outer table (table on the left of the
+ * operator) that produces matching values from the table-valued function (which is defined in
+ * the expression on the right side of the operator).
*
* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function.
*
@@ -635,23 +635,25 @@ class Table(
}
/**
- * The Cross Apply returns rows from the outer table (table on the left of the Apply operator)
- * that produces matching values from the table-valued function (which is on the right side of
- * the operator).
+ * The Cross Apply operator returns rows from the outer table (table on the left of the
+ * operator) that produces matching values from the table-valued function (which is defined in
+ * the expression on the right side of the operator).
*
* The Cross Apply is equivalent to Inner Join, but it works with a table-valued function.
*
* Example:
*
* {{{
- * class MySplitUDTF extends TableFunction[String] {
- * def eval(str: String): Unit = {
- * str.split("#").foreach(collect)
+ * class MySplitUDTF extends TableFunction<String> {
+ * public void eval(String str) {
+ * str.split("#").forEach(this::collect);
* }
* }
*
- * val split = new MySplitUDTF()
- * table.crossApply("split(c) as (s)").select("a, b, c, s")
+ * TableFunction<String> split = new MySplitUDTF();
+ * tableEnv.registerFunction("split", split);
+ *
+ * table.crossApply("split(c) as (s)").select("a, b, c, s");
* }}}
*/
def crossApply(udtf: String): Table = {
@@ -659,9 +661,10 @@ class Table(
}
/**
- * The Outer Apply returns all the rows from the outer table (table on the left of the Apply
- * operator), and rows that do not matches the condition from the table-valued function (which
- * is on the right side of the operator), NULL values are displayed.
+ * The Outer Apply operator returns all the rows from the outer table (table on the left of the
+ * Apply operator), and rows that do not match the condition from the table-valued function
+ * (which is defined in the expression on the right side of the operator).
+ * Rows with no matching condition are filled with null values.
*
* The Outer Apply is equivalent to Left Outer Join, but it works with a table-valued function.
*
@@ -683,17 +686,26 @@ class Table(
}
/**
- * The Outer Apply returns all the rows from the outer table (table on the left of the Apply
- * operator), and rows that do not matches the condition from the table-valued function (which
- * is on the right side of the operator), NULL values are displayed.
+ * The Outer Apply operator returns all the rows from the outer table (table on the left of the
+ * Apply operator), and rows that do not match the condition from the table-valued function
+ * (which is defined in the expression on the right side of the operator).
+ * Rows with no matching condition are filled with null values.
*
* The Outer Apply is equivalent to Left Outer Join, but it works with a table-valued function.
*
* Example:
*
* {{{
- * val split = new MySplitUDTF()
- * table.outerApply("split(c) as (s)").select("a, b, c, s")
+ * class MySplitUDTF extends TableFunction<String> {
+ * public void eval(String str) {
+ * str.split("#").forEach(this::collect);
+ * }
+ * }
+ *
+ * TableFunction<String> split = new MySplitUDTF();
+ * tableEnv.registerFunction("split", split);
+ *
+ * table.outerApply("split(c) as (s)").select("a, b, c, s");
* }}}
*/
def outerApply(udtf: String): Table = {
@@ -708,7 +720,7 @@ class Table(
private def applyInternal(udtf: Expression, joinType: JoinType): Table = {
var alias: Option[Seq[String]] = None
- // unwrap an Expression until get a TableFunctionCall
+ // unwrap an Expression until we get a TableFunctionCall
def unwrap(expr: Expression): TableFunctionCall = expr match {
case Alias(child, name, extraNames) =>
alias = Some(Seq(name) ++ extraNames)
@@ -717,7 +729,9 @@ class Table(
val function = tableEnv.getFunctionCatalog.lookupFunction(name, args)
unwrap(function)
case c: TableFunctionCall => c
- case _ => throw new TableException("Cross/Outer Apply only accept TableFunction")
+ case _ =>
+ throw new TableException(
+ "Cross/Outer Apply operators only accept expressions that define table functions.")
}
val call = unwrap(udtf)
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
index 4029a7d..dc68b89 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/validate/FunctionCatalog.scala
@@ -47,16 +47,18 @@ class FunctionCatalog {
sqlFunctions += sqlFunction
}
- /** Register multiple sql functions at one time. The functions has the same name. **/
+ /**
+ * Register multiple SQL functions at the same time. The functions have the same name.
+ */
def registerSqlFunctions(functions: Seq[SqlFunction]): Unit = {
if (functions.nonEmpty) {
val name = functions.head.getName
- // check all name is the same in the functions
+ // check that all functions have the same name
if (functions.forall(_.getName == name)) {
sqlFunctions --= sqlFunctions.filter(_.getName == name)
sqlFunctions ++= functions
} else {
- throw ValidationException("The sql functions request to register have different name.")
+ throw ValidationException("The SQL functions to be registered have different names.")
}
}
}
@@ -88,7 +90,7 @@ class FunctionCatalog {
case tf if classOf[TableFunction[_]].isAssignableFrom(tf) =>
val tableSqlFunction = sqlFunctions
.find(f => f.getName.equalsIgnoreCase(name) && f.isInstanceOf[TableSqlFunction])
- .getOrElse(throw ValidationException(s"Unregistered table sql function: $name"))
+ .getOrElse(throw ValidationException(s"Undefined table function: $name"))
.asInstanceOf[TableSqlFunction]
val typeInfo = tableSqlFunction.getRowTypeInfo
val function = tableSqlFunction.getTableFunction
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/resources/log4j-test.properties b/flink-libraries/flink-table/src/test/resources/log4j-test.properties
index 4c74d85..f713aa8 100644
--- a/flink-libraries/flink-table/src/test/resources/log4j-test.properties
+++ b/flink-libraries/flink-table/src/test/resources/log4j-test.properties
@@ -18,7 +18,7 @@
# Set root logger level to OFF to not flood build logs
# set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
# A1 is set to be a ConsoleAppender.
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionITCase.scala
deleted file mode 100644
index 7e0d0ff..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionITCase.scala
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.batch
-
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.expressions.utils._
-import org.apache.flink.api.table.{Row, Table, TableEnvironment}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-@RunWith(classOf[Parameterized])
-class UserDefinedTableFunctionITCase(
- mode: TestExecutionMode,
- configMode: TableConfigMode)
- extends TableProgramsTestBase(mode, configMode) {
-
- @Test
- def testSQLCrossApply(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
- val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
- tableEnv.registerTable("MyTable", in)
- tableEnv.registerFunction("split", new TableFunc1)
-
- val sqlQuery = "SELECT MyTable.c, t.s FROM MyTable, LATERAL TABLE(split(c)) AS t(s)"
-
- val result = tableEnv.sql(sqlQuery).toDataSet[Row]
- val results = result.collect()
- val expected: String = "Jack#22,Jack\n" + "Jack#22,22\n" + "John#19,John\n" + "John#19,19\n" +
- "Anna#44,Anna\n" + "Anna#44,44\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testSQLOuterApply(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
- val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
- tableEnv.registerTable("MyTable", in)
- tableEnv.registerFunction("split", new TableFunc2)
-
- val sqlQuery = "SELECT MyTable.c, t.a, t.b FROM MyTable LEFT JOIN LATERAL TABLE(split(c)) " +
- "AS t(a,b) ON TRUE"
-
- val result = tableEnv.sql(sqlQuery).toDataSet[Row]
- val results = result.collect()
- val expected: String = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
- "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n" + "nosharp,null,null"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testTableAPICrossApply(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
- val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
-
- val func1 = new TableFunc1
- val result = in.crossApply(func1('c) as ('s)).select('c, 's).toDataSet[Row]
- val results = result.collect()
- val expected: String = "Jack#22,Jack\n" + "Jack#22,22\n" + "John#19,John\n" + "John#19,19\n" +
- "Anna#44,Anna\n" + "Anna#44,44\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
-
- // with overloading
- val result2 = in.crossApply(func1('c, "$") as ('s)).select('c, 's).toDataSet[Row]
- val results2 = result2.collect()
- val expected2: String = "Jack#22,$Jack\n" + "Jack#22,$22\n" + "John#19,$John\n" +
- "John#19,$19\n" + "Anna#44,$Anna\n" + "Anna#44,$44\n"
- TestBaseUtils.compareResultAsText(results2.asJava, expected2)
- }
-
-
- @Test
- def testTableAPIOuterApply(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
- val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
- val func2 = new TableFunc2
- val result = in.outerApply(func2('c) as ('s, 'l)).select('c, 's, 'l).toDataSet[Row]
- val results = result.collect()
- val expected: String = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
- "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n" + "nosharp,null,null"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
-
- @Test
- def testCustomReturnType(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
- val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
- val func2 = new TableFunc2
-
- val result = in
- .crossApply(func2('c) as ('name, 'len))
- .select('c, 'name, 'len)
- .toDataSet[Row]
-
- val results = result.collect()
- val expected: String = "Jack#22,Jack,4\n" + "Jack#22,22,2\n" + "John#19,John,4\n" +
- "John#19,19,2\n" + "Anna#44,Anna,4\n" + "Anna#44,44,2\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testHierarchyType(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
- val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
-
- val hierarchy = new HierarchyTableFunction
- val result = in
- .crossApply(hierarchy('c) as ('name, 'adult, 'len))
- .select('c, 'name, 'adult, 'len)
- .toDataSet[Row]
-
- val results = result.collect()
- val expected: String = "Jack#22,Jack,true,22\n" + "John#19,John,false,19\n" +
- "Anna#44,Anna,true,44\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
- @Test
- def testPojoType(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
- val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
-
- val pojo = new PojoTableFunc()
- val result = in
- .crossApply(pojo('c))
- .select('c, 'name, 'age)
- .toDataSet[Row]
-
- val results = result.collect()
- val expected: String = "Jack#22,Jack,22\n" + "John#19,John,19\n" + "Anna#44,Anna,44\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
-
- @Test
- def testTableAPIWithFilter(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
- val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
- val func0 = new TableFunc0
-
- val result = in
- .crossApply(func0('c) as ('name, 'age))
- .select('c, 'name, 'age)
- .filter('age > 20)
- .toDataSet[Row]
-
- val results = result.collect()
- val expected: String = "Jack#22,Jack,22\n" + "Anna#44,Anna,44\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
-
- @Test
- def testUDTFWithScalarFunction(): Unit = {
- val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
- val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
- val in: Table = getSmall3TupleDataSet(env).toTable(tableEnv).as('a, 'b, 'c)
- val func1 = new TableFunc1
-
- val result = in
- .crossApply(func1('c.substring(2)) as 's)
- .select('c, 's)
- .toDataSet[Row]
-
- val results = result.collect()
- val expected: String = "Jack#22,ack\n" + "Jack#22,22\n" + "John#19,ohn\n" + "John#19,19\n" +
- "Anna#44,nna\n" + "Anna#44,44\n"
- TestBaseUtils.compareResultAsText(results.asJava, expected)
- }
-
-
- private def getSmall3TupleDataSet(env: ExecutionEnvironment): DataSet[(Int, Long, String)] = {
- val data = new mutable.MutableList[(Int, Long, String)]
- data.+=((1, 1L, "Jack#22"))
- data.+=((2, 2L, "John#19"))
- data.+=((3, 2L, "Anna#44"))
- data.+=((4, 3L, "nosharp"))
- env.fromCollection(data)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionTest.scala
deleted file mode 100644
index 7e236d1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/UserDefinedTableFunctionTest.scala
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.scala.batch
-
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => ScalaExecutionEnv, _}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JavaExecutionEnv}
-import org.apache.flink.api.table.expressions.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc1, TableFunc2}
-import org.apache.flink.api.table.typeutils.RowTypeInfo
-import org.apache.flink.api.table.utils.TableTestBase
-import org.apache.flink.api.table.utils.TableTestUtil._
-import org.apache.flink.api.table.{Row, TableEnvironment, Types}
-import org.junit.Test
-import org.mockito.Mockito._
-
-
-class UserDefinedTableFunctionTest extends TableTestBase {
-
- @Test
- def testTableAPI(): Unit = {
- // mock
- val ds = mock(classOf[DataSet[Row]])
- val jDs = mock(classOf[JDataSet[Row]])
- val typeInfo: TypeInformation[Row] = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING))
- when(ds.javaSet).thenReturn(jDs)
- when(jDs.getType).thenReturn(typeInfo)
-
- // Scala environment
- val env = mock(classOf[ScalaExecutionEnv])
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
-
- // Java environment
- val javaEnv = mock(classOf[JavaExecutionEnv])
- val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
- val in2 = javaTableEnv.fromDataSet(jDs).as("a, b, c")
- javaTableEnv.registerTable("MyTable", in2)
-
- // test cross apply
- val func1 = new TableFunc1
- javaTableEnv.registerFunction("func1", func1)
- var scalaTable = in1.crossApply(func1('c) as ('s)).select('c, 's)
- var javaTable = in2.crossApply("func1(c) as (s)").select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test outer apply
- scalaTable = in1.outerApply(func1('c) as ('s)).select('c, 's)
- javaTable = in2.outerApply("func1(c) as (s)").select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test overloading
- scalaTable = in1.crossApply(func1('c, "$") as ('s)).select('c, 's)
- javaTable = in2.crossApply("func1(c, '$') as (s)").select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test custom result type
- val func2 = new TableFunc2
- javaTableEnv.registerFunction("func2", func2)
- scalaTable = in1.crossApply(func2('c) as ('name, 'len)).select('c, 'name, 'len)
- javaTable = in2.crossApply("func2(c) as (name, len)").select("c, name, len")
- verifyTableEquals(scalaTable, javaTable)
-
- // test hierarchy generic type
- val hierarchy = new HierarchyTableFunction
- javaTableEnv.registerFunction("hierarchy", hierarchy)
- scalaTable = in1.crossApply(hierarchy('c) as ('name, 'adult, 'len))
- .select('c, 'name, 'len, 'adult)
- javaTable = in2.crossApply("hierarchy(c) as (name, adult, len)")
- .select("c, name, len, adult")
- verifyTableEquals(scalaTable, javaTable)
-
- // test pojo type
- val pojo = new PojoTableFunc
- javaTableEnv.registerFunction("pojo", pojo)
- scalaTable = in1.crossApply(pojo('c))
- .select('c, 'name, 'age)
- javaTable = in2.crossApply("pojo(c)")
- .select("c, name, age")
- verifyTableEquals(scalaTable, javaTable)
-
- // test with filter
- scalaTable = in1.crossApply(func2('c) as ('name, 'len))
- .select('c, 'name, 'len).filter('len > 2)
- javaTable = in2.crossApply("func2(c) as (name, len)")
- .select("c, name, len").filter("len > 2")
- verifyTableEquals(scalaTable, javaTable)
-
- // test with scalar function
- scalaTable = in1.crossApply(func1('c.substring(2)) as ('s))
- .select('a, 'c, 's)
- javaTable = in2.crossApply("func1(substring(c, 2)) as (s)")
- .select("a, c, s")
- verifyTableEquals(scalaTable, javaTable)
- }
-
- @Test
- def testSQLWithCrossApply(): Unit = {
- val util = batchTestUtil()
- val func1 = new TableFunc1
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func1", func1)
-
- val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"
-
- val expected = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetCorrelate",
- batchTableNode(0),
- term("invocation", "func1($cor0.c)"),
- term("function", func1.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS s")
- )
-
- util.verifySql(sqlQuery, expected)
-
- // test overloading
-
- val sqlQuery2 = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"
-
- val expected2 = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetCorrelate",
- batchTableNode(0),
- term("invocation", "func1($cor0.c, '$')"),
- term("function", func1.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS s")
- )
-
- util.verifySql(sqlQuery2, expected2)
- }
-
- @Test
- def testSQLWithOuterApply(): Unit = {
- val util = batchTestUtil()
- val func1 = new TableFunc1
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func1", func1)
-
- val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"
-
- val expected = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetCorrelate",
- batchTableNode(0),
- term("invocation", "func1($cor0.c)"),
- term("function", func1.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
- term("joinType", "LEFT")
- ),
- term("select", "c", "f0 AS s")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testSQLWithCustomType(): Unit = {
- val util = batchTestUtil()
- val func2 = new TableFunc2
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func2", func2)
-
- val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len)"
-
- val expected = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetCorrelate",
- batchTableNode(0),
- term("invocation", "func2($cor0.c)"),
- term("function", func2.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
- "VARCHAR(2147483647) f0, INTEGER f1)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS name", "f1 AS len")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testSQLWithHierarchyType(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = new HierarchyTableFunction
- util.addFunction("hierarchy", function)
-
- val sqlQuery = "SELECT c, T.* FROM MyTable, LATERAL TABLE(hierarchy(c)) AS T(name, adult, len)"
-
- val expected = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetCorrelate",
- batchTableNode(0),
- term("invocation", "hierarchy($cor0.c)"),
- term("function", function.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
- " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testSQLWithPojoType(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val function = new PojoTableFunc
- util.addFunction("pojo", function)
-
- val sqlQuery = "SELECT c, name, age FROM MyTable, LATERAL TABLE(pojo(c))"
-
- val expected = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetCorrelate",
- batchTableNode(0),
- term("invocation", "pojo($cor0.c)"),
- term("function", function.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
- " INTEGER age, VARCHAR(2147483647) name)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "name", "age")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testSQLWithFilter(): Unit = {
- val util = batchTestUtil()
- val func2 = new TableFunc2
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func2", func2)
-
- val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len) " +
- "WHERE len > 2"
-
- val expected = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetCorrelate",
- batchTableNode(0),
- term("invocation", "func2($cor0.c)"),
- term("function", func2.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
- "VARCHAR(2147483647) f0, INTEGER f1)"),
- term("joinType", "INNER"),
- term("condition", ">($1, 2)")
- ),
- term("select", "c", "f0 AS name", "f1 AS len")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
-
- @Test
- def testSQLWithScalarFunction(): Unit = {
- val util = batchTestUtil()
- val func1 = new TableFunc1
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- util.addFunction("func1", func1)
-
- val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(SUBSTRING(c, 2))) AS T(s)"
-
- val expected = unaryNode(
- "DataSetCalc",
- unaryNode(
- "DataSetCorrelate",
- batchTableNode(0),
- term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
- term("function", func1.getClass.getCanonicalName),
- term("rowType",
- "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
- term("joinType", "INNER")
- ),
- term("select", "c", "f0 AS s")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/684defbf/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
new file mode 100644
index 0000000..1c505ba
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/UserDefinedTableFunctionTest.scala
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2}
+import org.apache.flink.api.table.utils._
+import org.apache.flink.api.table.utils.TableTestUtil._
+import org.junit.Test
+
+class UserDefinedTableFunctionTest extends TableTestBase {
+
+ @Test
+ def testCrossApply(): Unit = {
+ val util = batchTestUtil()
+ val func1 = new TableFunc1
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func1", func1)
+
+ val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "func1($cor0.c)"),
+ term("function", func1.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS s")
+ )
+
+ util.verifySql(sqlQuery, expected)
+
+ // test overloading
+
+ val sqlQuery2 = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"
+
+ val expected2 = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "func1($cor0.c, '$')"),
+ term("function", func1.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS s")
+ )
+
+ util.verifySql(sqlQuery2, expected2)
+ }
+
+ @Test
+ def testOuterApply(): Unit = {
+ val util = batchTestUtil()
+ val func1 = new TableFunc1
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func1", func1)
+
+ val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "func1($cor0.c)"),
+ term("function", func1.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+ term("joinType", "LEFT")
+ ),
+ term("select", "c", "f0 AS s")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testCustomType(): Unit = {
+ val util = batchTestUtil()
+ val func2 = new TableFunc2
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func2", func2)
+
+ val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len)"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "func2($cor0.c)"),
+ term("function", func2.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+ "VARCHAR(2147483647) f0, INTEGER f1)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS name", "f1 AS len")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testHierarchyType(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = new HierarchyTableFunction
+ util.addFunction("hierarchy", function)
+
+ val sqlQuery = "SELECT c, T.* FROM MyTable, LATERAL TABLE(hierarchy(c)) AS T(name, adult, len)"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "hierarchy($cor0.c)"),
+ term("function", function.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+ " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testPojoType(): Unit = {
+ val util = batchTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = new PojoTableFunc
+ util.addFunction("pojo", function)
+
+ val sqlQuery = "SELECT c, name, age FROM MyTable, LATERAL TABLE(pojo(c))"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "pojo($cor0.c)"),
+ term("function", function.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+ " INTEGER age, VARCHAR(2147483647) name)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "name", "age")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testFilter(): Unit = {
+ val util = batchTestUtil()
+ val func2 = new TableFunc2
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func2", func2)
+
+ val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len) " +
+ "WHERE len > 2"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "func2($cor0.c)"),
+ term("function", func2.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+ "VARCHAR(2147483647) f0, INTEGER f1)"),
+ term("joinType", "INNER"),
+ term("condition", ">($1, 2)")
+ ),
+ term("select", "c", "f0 AS name", "f1 AS len")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+
+ @Test
+ def testScalarFunction(): Unit = {
+ val util = batchTestUtil()
+ val func1 = new TableFunc1
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func1", func1)
+
+ val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(SUBSTRING(c, 2))) AS T(s)"
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
+ term("function", func1.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS s")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+}