You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/11 11:19:31 UTC
[flink] 01/05: [FLINK-13107][table-planner-blink] Introduce
fieldNames to TypedFlinkTableFunction to using fieldNames which is
specified.
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5338ec782839e8bc0142334b9db99a4239954de9
Author: beyond1920 <be...@126.com>
AuthorDate: Mon Jul 8 14:45:02 2019 +0800
[FLINK-13107][table-planner-blink] Introduce fieldNames to TypedFlinkTableFunction to using fieldNames which is specified.
---
.../flink/table/plan/QueryOperationConverter.java | 4 +-
.../functions/utils/UserDefinedFunctionUtils.scala | 12 +-
.../plan/schema/TypedFlinkTableFunction.scala | 22 +-
.../flink/table/plan/util/SetOpRewriteUtil.scala | 2 +-
.../apache/flink/table/util/FieldInfoUtils.scala | 218 ------------
.../runtime/batch/table/CorrelateITCase.scala | 23 +-
.../runtime/stream/table/CorrelateITCase.scala | 364 +++++++++++++++++++++
.../apache/flink/table/util/TableTestBase.scala | 18 +-
8 files changed, 405 insertions(+), 258 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
index 12d7038..e05f0ec 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
@@ -266,8 +266,10 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod
public <U> RelNode visit(CalculatedQueryOperation<U> calculatedTable) {
DataType resultType = fromLegacyInfoToDataType(calculatedTable.getResultType());
TableFunction<?> tableFunction = calculatedTable.getTableFunction();
+ String[] fieldNames = calculatedTable.getTableSchema().getFieldNames();
+
TypedFlinkTableFunction function = new TypedFlinkTableFunction(
- tableFunction, resultType);
+ tableFunction, fieldNames, resultType);
FlinkTypeFactory typeFactory = relBuilder.getTypeFactory();
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 0da6142..7630fe6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -22,7 +22,7 @@ package org.apache.flink.table.functions.utils
import org.apache.flink.api.common.functions.InvalidTypesException
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils._
-import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.dataformat.{BaseRow, BinaryString, Decimal}
import org.apache.flink.table.functions._
@@ -31,11 +31,12 @@ import org.apache.flink.table.types.ClassDataTypeConverter.fromClassToDataType
import org.apache.flink.table.types.ClassLogicalTypeConverter.{getDefaultExternalClassForType, getInternalClassForType}
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.LogicalTypeDataTypeConverter.{fromDataTypeToLogicalType, fromLogicalTypeToDataType}
+import org.apache.flink.table.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot, RowType}
import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+import org.apache.flink.table.typeutils.FieldInfoUtils
import org.apache.flink.table.typeutils.TypeCheckUtils.isAny
-import org.apache.flink.table.util.FieldInfoUtils
import org.apache.flink.types.Row
import org.apache.flink.util.InstantiationUtil
@@ -656,10 +657,11 @@ object UserDefinedFunctionUtils {
*/
def getFieldInfo(inputType: DataType)
: (Array[String], Array[Int], Array[LogicalType]) = {
+ val inputTypeInfo = fromDataTypeToTypeInfo(inputType)
(
- FieldInfoUtils.getFieldNames(inputType),
- FieldInfoUtils.getFieldIndices(inputType),
- FieldInfoUtils.getFieldTypes(inputType))
+ FieldInfoUtils.getFieldNames(inputTypeInfo),
+ FieldInfoUtils.getFieldIndices(inputTypeInfo),
+ FieldInfoUtils.getFieldTypes(inputTypeInfo).map(fromTypeInfoToLogicalType))
}
/**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TypedFlinkTableFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TypedFlinkTableFunction.scala
index b0fe4dd..acc1533 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TypedFlinkTableFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TypedFlinkTableFunction.scala
@@ -21,7 +21,9 @@ package org.apache.flink.table.plan.schema
import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.table.types.DataType
-import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+import org.apache.flink.table.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.typeutils.FieldInfoUtils
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
@@ -34,6 +36,7 @@ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
*/
class TypedFlinkTableFunction(
val tableFunction: TableFunction[_],
+ fieldNames: Array[String],
val externalResultType: DataType)
extends FlinkTableFunction(tableFunction) {
@@ -46,8 +49,19 @@ class TypedFlinkTableFunction(
typeFactory: RelDataTypeFactory,
arguments: Array[AnyRef],
argTypes: Array[Class[_]]): RelDataType = {
- // we have determined the row type before, just convert it to RelDataType
- typeFactory.asInstanceOf[FlinkTypeFactory].createFieldTypeFromLogicalType(
- fromDataTypeToLogicalType(externalResultType))
+ val fieldTypes = FieldInfoUtils.getFieldTypes(
+ fromDataTypeToTypeInfo(externalResultType)).map(fromTypeInfoToLogicalType)
+ if (fieldTypes.length < fieldNames.length) {
+ throw new RuntimeException(s"fieldTypes: $fieldTypes, but fieldNames: $fieldNames")
+ }
+
+ val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+ val builder = flinkTypeFactory.builder
+ fieldNames
+ .zip(fieldTypes.dropRight(fieldTypes.length - fieldNames.length))
+ .foreach { f =>
+ builder.add(f._1, flinkTypeFactory.createFieldTypeFromLogicalType(f._2))
+ }
+ builder.build
}
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/SetOpRewriteUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/SetOpRewriteUtil.scala
index 5030694..f300645 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/SetOpRewriteUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/SetOpRewriteUtil.scala
@@ -81,7 +81,7 @@ object SetOpRewriteUtil {
val fieldTypes = logicalType.getChildren.map(fromLogicalTypeToTypeInfo).toArray
val tf = new ReplicateRows(fieldTypes)
val resultType = fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, fieldNames))
- val function = new TypedFlinkTableFunction(tf, resultType)
+ val function = new TypedFlinkTableFunction(tf, fieldNames, resultType)
val typeFactory = builder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
val sqlFunction = new TableSqlFunction(
tf.functionIdentifier,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/FieldInfoUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/FieldInfoUtils.scala
deleted file mode 100644
index 3348bee..0000000
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/FieldInfoUtils.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.util
-
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.dataformat.BaseRow
-import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
-import org.apache.flink.table.types.logical.{LogicalType, RowType, TypeInformationAnyType}
-import org.apache.flink.table.types.{ClassLogicalTypeConverter, DataType}
-import org.apache.flink.types.Row
-
-import java.lang.reflect.Modifier
-
-import scala.collection.JavaConversions._
-
-// TODO remove this, and use org.apache.flink.table.typeutils.FieldInfoUtils
-object FieldInfoUtils {
-
- /**
- * Returns field names for a given [[DataType]].
- *
- * @param inputType The DataType extract the field names.
- * @tparam A The type of the DataType.
- * @return An array holding the field names
- */
- def getFieldNames[A](inputType: DataType): Array[String] = {
- validateType(inputType)
-
- val fieldNames: Array[String] = fromDataTypeToLogicalType(inputType) match {
- case t: RowType => t.getFieldNames.toArray(Array[String]())
- case t => Array("f0")
- }
-
- if (fieldNames.contains("*")) {
- throw new TableException("Field name can not be '*'.")
- }
-
- fieldNames
- }
-
- /**
- * Returns field indexes for a given [[DataType]].
- *
- * @param inputType The DataType extract the field positions from.
- * @return An array holding the field positions
- */
- def getFieldIndices(inputType: DataType): Array[Int] = {
- getFieldNames(inputType).indices.toArray
- }
-
- /**
- * Returns field types for a given [[DataType]].
- *
- * @param inputType The DataType to extract field types from.
- * @return An array holding the field types.
- */
- def getFieldTypes(inputType: DataType): Array[LogicalType] = {
- validateType(inputType)
-
- fromDataTypeToLogicalType(inputType) match {
- case t: RowType => t.getChildren.toArray(Array[LogicalType]())
- case t => Array(t)
- }
- }
-
- /**
- * Validate if class represented by the typeInfo is static and globally accessible
- * @param dataType type to check
- * @throws TableException if type does not meet these criteria
- */
- def validateType(dataType: DataType): Unit = {
- var clazz = dataType.getConversionClass
- if (clazz == null) {
- clazz = ClassLogicalTypeConverter.getDefaultExternalClassForType(dataType.getLogicalType)
- }
- if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
- !Modifier.isPublic(clazz.getModifiers) ||
- clazz.getCanonicalName == null) {
- throw new TableException(
- s"Class '$clazz' described in type information '$dataType' must be " +
- s"static and globally accessible.")
- }
- }
-
- /**
- * Returns field names and field positions for a given [[DataType]].
- *
- * @param inputType The DataType extract the field names and positions from.
- * @tparam A The type of the DataType.
- * @return A tuple of two arrays holding the field names and corresponding field positions.
- */
- protected[flink] def getFieldInfo[A](
- inputType: DataType): (Array[String], Array[Int]) = {
-
- val logicalType = fromDataTypeToLogicalType(inputType)
- logicalType match {
- case value: TypeInformationAnyType[A]
- if value.getTypeInformation.getTypeClass == classOf[Row] =>
- throw new TableException(
- "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
- "Please specify the type of the input with a RowTypeInfo.")
- case _ =>
- (getFieldNames(inputType), getFieldIndices(inputType))
- }
- }
-
- /**
- * Returns field names and field positions for a given [[DataType]] and [[Array]] of
- * field names. It does not handle time attributes.
- *
- * @param inputType The [[DataType]] against which the field names are referenced.
- * @param fields The fields that define the field names.
- * @tparam A The type of the DataType.
- * @return A tuple of two arrays holding the field names and corresponding field positions.
- */
- // TODO: we should support Expression fields after we introduce [Expression]
- // TODO remove this method and use FieldInfoUtils#getFieldsInfo
- protected[flink] def getFieldInfo[A](
- inputType: DataType,
- fields: Array[String]): (Array[String], Array[Int]) = {
-
- validateType(inputType)
-
- def referenceByName(name: String, ct: RowType): Option[Int] = {
- val inputIdx = ct.getFieldIndex(name)
- if (inputIdx < 0) {
- throw new TableException(s"$name is not a field of type $ct. " +
- s"Expected: ${ct.getFieldNames.mkString(", ")}")
- } else {
- Some(inputIdx)
- }
- }
-
- val indexedNames: Array[(Int, String)] = fromDataTypeToLogicalType(inputType) match {
-
- case g: TypeInformationAnyType[A]
- if g.getTypeInformation.getTypeClass == classOf[Row] ||
- g.getTypeInformation.getTypeClass == classOf[BaseRow] =>
- throw new TableException(
- "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
- "Please specify the type of the input with a RowTypeInfo.")
-
- case t: RowType =>
-
- // determine schema definition mode (by position or by name)
- val isRefByPos = isReferenceByPosition(t, fields)
-
- fields.zipWithIndex flatMap { case (name, idx) =>
- if (name.endsWith("rowtime") || name.endsWith("proctime")) {
- None
- } else {
- if (isRefByPos) {
- Some((idx, name))
- } else {
- referenceByName(name, t).map((_, name))
- }
- }
- }
-
- case _ => // atomic or other custom type information
- if (fields.length > 1) {
- // only accept the first field for an atomic type
- throw new TableException("Only accept one field to reference an atomic type.")
- }
- // first field reference is mapped to atomic type
- Array((0, fields(0)))
- }
-
- val (fieldIndexes, fieldNames) = indexedNames.unzip
-
- if (fieldNames.contains("*")) {
- throw new TableException("Field name can not be '*'.")
- }
-
- (fieldNames, fieldIndexes)
- }
-
- /**
- * Reference input fields by name:
- * All fields in the schema definition are referenced by name
- * (and possibly renamed using an alias (as). In this mode, fields can be reordered and
- * projected out. Moreover, we can define proctime and rowtime attributes at arbitrary
- * positions using arbitrary names (except those that exist in the result schema). This mode
- * can be used for any input type, including POJOs.
- *
- * Reference input fields by position:
- * In this mode, fields are simply renamed. Event-time attributes can
- * replace the field on their position in the input data (if it is of correct type) or be
- * appended at the end. Proctime attributes must be appended at the end. This mode can only be
- * used if the input type has a defined field order (tuple, case class, Row) and no of fields
- * references a field of the input type.
- */
- // TODO: we should support Expression fields after we introduce [Expression]
- protected def isReferenceByPosition(ct: RowType, fields: Array[String]): Boolean = {
- val inputNames = ct.getFieldNames
-
- // Use the by-position mode if no of the fields exists in the input.
- // This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed
- // by position but the user might assume reordering instead of renaming.
- fields.forall(!inputNames.contains(_))
- }
-
-}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
index 0f5b954..61132be 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
@@ -35,8 +35,6 @@ import java.sql.{Date, Timestamp}
import scala.collection.JavaConverters._
import scala.collection.mutable
-// TODO Enable after TypedFlinkTableFunction take fieldNames into consideration
-@Ignore
class CorrelateITCase extends BatchTestBase {
@Test
@@ -179,6 +177,8 @@ class CorrelateITCase extends BatchTestBase {
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+ // TODO
+ @Ignore("Type question, should be fixed later.")
@Test
def testUserDefinedTableFunctionWithScalarFunctionInCondition(): Unit = {
val in = testData.as('a, 'b, 'c)
@@ -291,6 +291,8 @@ class CorrelateITCase extends BatchTestBase {
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
+ // TODO
+ @Ignore("Add a rule to translate a Correlate without correlateSets to Join!")
@Test
def testTableFunctionWithVariableArguments(): Unit = {
val varArgsFunc0 = new VarArgsFunc0
@@ -351,21 +353,8 @@ class CorrelateITCase extends BatchTestBase {
TestBaseUtils.compareResultAsText(results.asJava, expected)
}
-// @Test
-// def testCorrelateAfterConcatAggWithConstantParam(): Unit = {
-// val in = testData.as('a, 'b, 'c)
-// val in2 = testData.as('a, 'b, 'c)
-// val func0 = new TableFunc0
-// val left = in.select('c.concat_agg("#") as 'd)
-// val result = in2.join(left).as('a, 'b, 'c, 'd)
-// .joinLateral(func0('c) as ('name, 'age))
-// .select('a, 'c, 'name, 'age)
-//
-// val results = executeQuery(result)
-// val expected = "1,Jack#22,Jack,22\n2,John#19,John,19\n3,Anna#44,Anna,44"
-// TestBaseUtils.compareResultAsText(results.asJava, expected)
-// }
-
+ // TODO
+ @Ignore("Type question, should be fixed later.")
@Test
def testTableFunctionCollectorOpenClose(): Unit = {
val t = testData.as('a, 'b, 'c)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
new file mode 100644
index 0000000..a42e255
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.expressions.utils.{Func18, FuncWithOpen, RichFunc2}
+import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.table.runtime.utils.TestData._
+import org.apache.flink.table.runtime.utils._
+import org.apache.flink.table.util.{PojoTableFunc, RF, RichTableFunc1, TableFunc0, TableFunc2, TableFunc3, TableFunc6, TableFunc7, VarArgsFunc0}
+import org.apache.flink.types.Row
+
+import org.junit.Assert._
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import java.lang.{Boolean => JBoolean}
+
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class CorrelateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) {
+
+ @Test
+ def testCrossJoin(): Unit = {
+ val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+ val func0 = new TableFunc0
+ val pojoFunc0 = new PojoTableFunc()
+
+ val result = t
+ .joinLateral(func0('c) as('d, 'e))
+ .select('c, 'd, 'e)
+ .joinLateral(pojoFunc0('c))
+ .where('age > 20)
+ .select('c, 'name, 'age)
+ .toAppendStream[Row]
+
+ val sink = new TestingAppendSink
+ result.addSink(sink)
+ env.execute()
+
+ val expected = mutable.MutableList("Jack#22,Jack,22", "Anna#44,Anna,44")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testLeftOuterJoinWithoutPredicates(): Unit = {
+ val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+ val func0 = new TableFunc0
+
+ val result = t
+ .leftOuterJoinLateral(func0('c) as('d, 'e))
+ .select('c, 'd, 'e)
+ .toAppendStream[Row]
+
+ val sink = new TestingAppendSink
+ result.addSink(sink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "nosharp,null,null", "Jack#22,Jack,22",
+ "John#19,John,19", "Anna#44,Anna,44")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ /**
+ * Common join predicates are temporarily forbidden (see FLINK-7865).
+ */
+ @Test (expected = classOf[ValidationException])
+ def testLeftOuterJoinWithPredicates(): Unit = {
+ val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+ val func0 = new TableFunc0
+
+ val result = t
+ .leftOuterJoinLateral(func0('c) as ('s, 'l), 'a === 'l)
+ .select('c, 's, 'l)
+ .toAppendStream[Row]
+
+ val sink = new TestingAppendSink
+ result.addSink(sink)
+ env.execute()
+
+ val expected = "John#19,null,null\n" + "John#22,null,null\n" + "Anna44,null,null\n" +
+ "nosharp,null,null"
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testUserDefinedTableFunctionWithScalarFunction(): Unit = {
+ val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+ val func0 = new TableFunc0
+
+ val result = t
+ .joinLateral(func0('c) as('d, 'e))
+ .where(Func18('d, "J"))
+ .select('c, 'd, 'e)
+ .toAppendStream[Row]
+
+ val sink = new TestingAppendSink
+ result.addSink(sink)
+ env.execute()
+
+ val expected = mutable.MutableList("Jack#22,Jack,22", "John#19,John,19")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testUserDefinedTableFunctionWithParameter(): Unit = {
+ val tableFunc1 = new RichTableFunc1
+ tEnv.registerFunction("RichTableFunc1", tableFunc1)
+ UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> " "))
+
+ val result = failingDataSource(smallTupleData3)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .joinLateral(tableFunc1('c) as 's)
+ .select('a, 's)
+
+ val sink = new TestingAppendSink
+ result.toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = mutable.MutableList("3,Hello", "3,world")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testUserDefinedTableFunctionWithUserDefinedScalarFunction(): Unit = {
+ val tableFunc1 = new RichTableFunc1
+ val richFunc2 = new RichFunc2
+ tEnv.registerFunction("RichTableFunc1", tableFunc1)
+ tEnv.registerFunction("RichFunc2", richFunc2)
+ UserDefinedFunctionTestUtils.setJobParameters(
+ env,
+ Map("word_separator" -> "#", "string.value" -> "test"))
+
+ val result = failingDataSource(smallTupleData3)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .joinLateral(tableFunc1(richFunc2('c)) as 's)
+ .select('a, 's)
+
+ val sink = new TestingAppendSink
+ result.toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,Hi",
+ "1,test",
+ "2,Hello",
+ "2,test",
+ "3,Hello world",
+ "3,test")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testTableFunctionConstructorWithParams(): Unit = {
+ val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+ val config = Map("key1" -> "value1", "key2" -> "value2")
+ val func30 = new TableFunc3(null)
+ val func31 = new TableFunc3("OneConf_")
+ val func32 = new TableFunc3("TwoConf_", config)
+
+ val result = t
+ .joinLateral(func30('c) as('d, 'e))
+ .select('c, 'd, 'e)
+ .joinLateral(func31('c) as ('f, 'g))
+ .select('c, 'd, 'e, 'f, 'g)
+ .joinLateral(func32('c) as ('h, 'i))
+ .select('c, 'd, 'f, 'h, 'e, 'g, 'i)
+ .toAppendStream[Row]
+
+ val sink = new TestingAppendSink
+ result.addSink(sink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Anna#44,Anna,OneConf_Anna,TwoConf__key=key1_value=value1_Anna,44,44,44",
+ "Anna#44,Anna,OneConf_Anna,TwoConf__key=key2_value=value2_Anna,44,44,44",
+ "Jack#22,Jack,OneConf_Jack,TwoConf__key=key1_value=value1_Jack,22,22,22",
+ "Jack#22,Jack,OneConf_Jack,TwoConf__key=key2_value=value2_Jack,22,22,22",
+ "John#19,John,OneConf_John,TwoConf__key=key1_value=value1_John,19,19,19",
+ "John#19,John,OneConf_John,TwoConf__key=key2_value=value2_John,19,19,19"
+ )
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testTableFunctionWithVariableArguments(): Unit = {
+ val varArgsFunc0 = new VarArgsFunc0
+ tEnv.registerFunction("VarArgsFunc0", varArgsFunc0)
+
+ val result = testData(env)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .select('c)
+ .joinLateral(varArgsFunc0("1", "2", 'c))
+
+ val sink = new TestingAppendSink
+ result.toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Anna#44,1",
+ "Anna#44,2",
+ "Anna#44,Anna#44",
+ "Jack#22,1",
+ "Jack#22,2",
+ "Jack#22,Jack#22",
+ "John#19,1",
+ "John#19,2",
+ "John#19,John#19",
+ "nosharp,1",
+ "nosharp,2",
+ "nosharp,nosharp")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testRowType(): Unit = {
+ val row = Row.of(
+ 12.asInstanceOf[Integer],
+ true.asInstanceOf[JBoolean],
+ Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer], 3.asInstanceOf[Integer])
+ )
+
+ val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT))
+ val in = env.fromElements(row, row)(rowType).toTable(tEnv).as('a, 'b, 'c)
+
+ val tableFunc = new TableFunc6()
+ val result = in
+ .joinLateral(tableFunc('c) as ('f0, 'f1, 'f2))
+ .select('c, 'f2)
+
+ val sink = new TestingAppendSink
+ result.toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,2,3,3",
+ "1,2,3,3")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testTableFunctionCollectorOpenClose(): Unit = {
+ val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+ val func0 = new TableFunc0
+ val func26 = new FuncWithOpen
+ tEnv.registerFunction("func26", func26)
+ val result = t
+ .joinLateral(func0('c) as('d, 'e))
+ .where(func26('e))
+ .select('c, 'd, 'e)
+ .toAppendStream[Row]
+
+ val sink = new TestingAppendSink
+ result.addSink(sink)
+ env.execute()
+
+ val expected = Seq (
+ "Jack#22,Jack,22",
+ "John#19,John,19",
+ "Anna#44,Anna,44"
+ )
+
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testTableFunctionCollectorInit(): Unit = {
+ val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+ val func0 = new TableFunc0
+
+ // this case will generate 'timestamp' member field and 'DateFormatter'
+ val result = t
+ .joinLateral(func0('c) as('d, 'e))
+ .where(dateFormat(currentTimestamp(), "yyyyMMdd") === 'd)
+ .select('c, 'd, 'e)
+ .toAppendStream[Row]
+
+ val sink = new TestingAppendSink
+ result.addSink(sink)
+ env.execute()
+
+ assertTrue(sink.getAppendResults.isEmpty)
+ }
+
+ @Test
+ def testFlatMap(): Unit = {
+ val func2 = new TableFunc2
+ val ds = testData(env).toTable(tEnv, 'a, 'b, 'c)
+ // test non alias
+ .flatMap(func2('c))
+ .select('f0, 'f1)
+ // test the output field name of flatMap is the same as the field name of the input table
+ .flatMap(func2(concat('f0, "#")))
+ .as ('f0, 'f1)
+ .select('f0, 'f1)
+
+ val sink = new TestingAppendSink
+ ds.toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Jack,4",
+ "22,2",
+ "John,4",
+ "19,2",
+ "Anna,4",
+ "44,2")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testMultipleEvals(): Unit = {
+ val rf = new RF
+ val tf = new TableFunc7
+
+ val row = Row.of(
+ 12.asInstanceOf[Integer],
+ true.asInstanceOf[JBoolean],
+ Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer], 3.asInstanceOf[Integer])
+ )
+
+ val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT))
+ val in = env.fromElements(row, row)(rowType).toTable(tEnv).as('a, 'b, 'c)
+ val result = in.select(rf('a) as 'd).joinLateral(tf('d) as 'e)
+
+ val sink = new TestingAppendSink
+
+ result.toAppendStream[Row].addSink(sink)
+ env.execute()
+ }
+
+ private def testData(
+ env: StreamExecutionEnvironment)
+ : DataStream[(Int, Long, String)] = {
+
+ val data = new mutable.MutableList[(Int, Long, String)]
+ data.+=((1, 1L, "Jack#22"))
+ data.+=((2, 2L, "John#19"))
+ data.+=((3, 2L, "Anna#44"))
+ data.+=((4, 3L, "nosharp"))
+ env.fromCollection(data)
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index a59622f..fcbc4c3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -18,9 +18,8 @@
package org.apache.flink.table.util
import org.apache.flink.api.common.JobExecutionResult
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, StreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
@@ -50,6 +49,7 @@ import org.apache.flink.table.sinks._
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo
import org.apache.flink.table.types.logical.LogicalType
+import org.apache.flink.table.typeutils.FieldInfoUtils
import org.apache.flink.types.Row
import org.apache.calcite.rel.RelNode
@@ -171,13 +171,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isBatch: Boolean) {
*/
def addTableSource[T: TypeInformation](name: String, fields: Expression*): Table = {
val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
- val fieldTypes: Array[TypeInformation[_]] = typeInfo match {
- case tt: TupleTypeInfo[_] => (0 until tt.getArity).map(tt.getTypeAt).toArray
- case ct: CaseClassTypeInfo[_] => (0 until ct.getArity).map(ct.getTypeAt).toArray
- case at: AtomicType[_] => Array[TypeInformation[_]](at)
- case _ => throw new TableException(s"Unsupported type info: $typeInfo")
- }
- val fieldsInfo = org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(
+ val fieldsInfo = FieldInfoUtils.getFieldsInfo(
typeInfo, fields.toArray)
addTableSource(name, new TestTableSource(isBatch, fieldsInfo.toTableSchema))
}
@@ -1011,7 +1005,7 @@ object TableTestUtil {
val streamType = dataStream.getType
// get field names and types for all non-replaced fields
val typeInfoSchema = fields.map((f: Array[Expression]) => {
- val fieldsInfo = org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(streamType, f)
+ val fieldsInfo = FieldInfoUtils.getFieldsInfo(streamType, f)
// check if event-time is enabled
if (fieldsInfo.isRowtimeDefined &&
(execEnv.getStreamTimeCharacteristic ne TimeCharacteristic.EventTime)) {
@@ -1021,7 +1015,7 @@ object TableTestUtil {
execEnv.getStreamTimeCharacteristic))
}
fieldsInfo
- }).getOrElse(org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(streamType))
+ }).getOrElse(FieldInfoUtils.getFieldsInfo(streamType))
val fieldCnt = typeInfoSchema.getFieldTypes.length
val dataStreamQueryOperation = new DataStreamQueryOperation(