You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/11 11:19:31 UTC

[flink] 01/05: [FLINK-13107][table-planner-blink] Introduce fieldNames to TypedFlinkTableFunction to using fieldNames which is specified.

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5338ec782839e8bc0142334b9db99a4239954de9
Author: beyond1920 <be...@126.com>
AuthorDate: Mon Jul 8 14:45:02 2019 +0800

    [FLINK-13107][table-planner-blink] Introduce fieldNames to TypedFlinkTableFunction to using fieldNames which is specified.
---
 .../flink/table/plan/QueryOperationConverter.java  |   4 +-
 .../functions/utils/UserDefinedFunctionUtils.scala |  12 +-
 .../plan/schema/TypedFlinkTableFunction.scala      |  22 +-
 .../flink/table/plan/util/SetOpRewriteUtil.scala   |   2 +-
 .../apache/flink/table/util/FieldInfoUtils.scala   | 218 ------------
 .../runtime/batch/table/CorrelateITCase.scala      |  23 +-
 .../runtime/stream/table/CorrelateITCase.scala     | 364 +++++++++++++++++++++
 .../apache/flink/table/util/TableTestBase.scala    |  18 +-
 8 files changed, 405 insertions(+), 258 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
index 12d7038..e05f0ec 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/plan/QueryOperationConverter.java
@@ -266,8 +266,10 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod
 		public <U> RelNode visit(CalculatedQueryOperation<U> calculatedTable) {
 			DataType resultType = fromLegacyInfoToDataType(calculatedTable.getResultType());
 			TableFunction<?> tableFunction = calculatedTable.getTableFunction();
+			String[] fieldNames = calculatedTable.getTableSchema().getFieldNames();
+
 			TypedFlinkTableFunction function = new TypedFlinkTableFunction(
-					tableFunction, resultType);
+					tableFunction, fieldNames, resultType);
 
 			FlinkTypeFactory typeFactory = relBuilder.getTypeFactory();
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
index 0da6142..7630fe6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
@@ -22,7 +22,7 @@ package org.apache.flink.table.functions.utils
 import org.apache.flink.api.common.functions.InvalidTypesException
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils._
-import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.api.{TableException, ValidationException}
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.dataformat.{BaseRow, BinaryString, Decimal}
 import org.apache.flink.table.functions._
@@ -31,11 +31,12 @@ import org.apache.flink.table.types.ClassDataTypeConverter.fromClassToDataType
 import org.apache.flink.table.types.ClassLogicalTypeConverter.{getDefaultExternalClassForType, getInternalClassForType}
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.LogicalTypeDataTypeConverter.{fromDataTypeToLogicalType, fromLogicalTypeToDataType}
+import org.apache.flink.table.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
 import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.types.logical.{LogicalType, LogicalTypeRoot, RowType}
 import org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType
+import org.apache.flink.table.typeutils.FieldInfoUtils
 import org.apache.flink.table.typeutils.TypeCheckUtils.isAny
-import org.apache.flink.table.util.FieldInfoUtils
 import org.apache.flink.types.Row
 import org.apache.flink.util.InstantiationUtil
 
@@ -656,10 +657,11 @@ object UserDefinedFunctionUtils {
     */
   def getFieldInfo(inputType: DataType)
     : (Array[String], Array[Int], Array[LogicalType]) = {
+    val inputTypeInfo = fromDataTypeToTypeInfo(inputType)
     (
-      FieldInfoUtils.getFieldNames(inputType),
-      FieldInfoUtils.getFieldIndices(inputType),
-      FieldInfoUtils.getFieldTypes(inputType))
+        FieldInfoUtils.getFieldNames(inputTypeInfo),
+        FieldInfoUtils.getFieldIndices(inputTypeInfo),
+        FieldInfoUtils.getFieldTypes(inputTypeInfo).map(fromTypeInfoToLogicalType))
   }
 
   /**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TypedFlinkTableFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TypedFlinkTableFunction.scala
index b0fe4dd..acc1533 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TypedFlinkTableFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/schema/TypedFlinkTableFunction.scala
@@ -21,7 +21,9 @@ package org.apache.flink.table.plan.schema
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.types.DataType
-import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
+import org.apache.flink.table.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
+import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
+import org.apache.flink.table.typeutils.FieldInfoUtils
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 
@@ -34,6 +36,7 @@ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
   */
 class TypedFlinkTableFunction(
     val tableFunction: TableFunction[_],
+    fieldNames: Array[String],
     val externalResultType: DataType)
   extends FlinkTableFunction(tableFunction) {
 
@@ -46,8 +49,19 @@ class TypedFlinkTableFunction(
       typeFactory: RelDataTypeFactory,
       arguments: Array[AnyRef],
       argTypes: Array[Class[_]]): RelDataType = {
-    // we have determined the row type before, just convert it to RelDataType
-    typeFactory.asInstanceOf[FlinkTypeFactory].createFieldTypeFromLogicalType(
-      fromDataTypeToLogicalType(externalResultType))
+    val fieldTypes = FieldInfoUtils.getFieldTypes(
+      fromDataTypeToTypeInfo(externalResultType)).map(fromTypeInfoToLogicalType)
+    if (fieldTypes.length < fieldNames.length) {
+      throw new RuntimeException(s"fieldTypes: $fieldTypes, but fieldNames: $fieldNames")
+    }
+
+    val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
+    val builder = flinkTypeFactory.builder
+    fieldNames
+        .zip(fieldTypes.dropRight(fieldTypes.length - fieldNames.length))
+        .foreach { f =>
+          builder.add(f._1, flinkTypeFactory.createFieldTypeFromLogicalType(f._2))
+        }
+    builder.build
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/SetOpRewriteUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/SetOpRewriteUtil.scala
index 5030694..f300645 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/SetOpRewriteUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/SetOpRewriteUtil.scala
@@ -81,7 +81,7 @@ object SetOpRewriteUtil {
     val fieldTypes = logicalType.getChildren.map(fromLogicalTypeToTypeInfo).toArray
     val tf = new ReplicateRows(fieldTypes)
     val resultType = fromLegacyInfoToDataType(new RowTypeInfo(fieldTypes, fieldNames))
-    val function = new TypedFlinkTableFunction(tf, resultType)
+    val function = new TypedFlinkTableFunction(tf, fieldNames, resultType)
     val typeFactory = builder.getTypeFactory.asInstanceOf[FlinkTypeFactory]
     val sqlFunction = new TableSqlFunction(
       tf.functionIdentifier,
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/FieldInfoUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/FieldInfoUtils.scala
deleted file mode 100644
index 3348bee..0000000
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/util/FieldInfoUtils.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.util
-
-import org.apache.flink.table.api.TableException
-import org.apache.flink.table.dataformat.BaseRow
-import org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
-import org.apache.flink.table.types.logical.{LogicalType, RowType, TypeInformationAnyType}
-import org.apache.flink.table.types.{ClassLogicalTypeConverter, DataType}
-import org.apache.flink.types.Row
-
-import java.lang.reflect.Modifier
-
-import scala.collection.JavaConversions._
-
-// TODO remove this, and use org.apache.flink.table.typeutils.FieldInfoUtils
-object FieldInfoUtils {
-
-  /**
-    * Returns field names for a given [[DataType]].
-    *
-    * @param inputType The DataType extract the field names.
-    * @tparam A The type of the DataType.
-    * @return An array holding the field names
-    */
-  def getFieldNames[A](inputType: DataType): Array[String] = {
-    validateType(inputType)
-
-    val fieldNames: Array[String] = fromDataTypeToLogicalType(inputType) match {
-      case t: RowType => t.getFieldNames.toArray(Array[String]())
-      case t => Array("f0")
-    }
-
-    if (fieldNames.contains("*")) {
-      throw new TableException("Field name can not be '*'.")
-    }
-
-    fieldNames
-  }
-
-  /**
-    * Returns field indexes for a given [[DataType]].
-    *
-    * @param inputType The DataType extract the field positions from.
-    * @return An array holding the field positions
-    */
-  def getFieldIndices(inputType: DataType): Array[Int] = {
-    getFieldNames(inputType).indices.toArray
-  }
-
-  /**
-    * Returns field types for a given [[DataType]].
-    *
-    * @param inputType The DataType to extract field types from.
-    * @return An array holding the field types.
-    */
-  def getFieldTypes(inputType: DataType): Array[LogicalType] = {
-    validateType(inputType)
-
-    fromDataTypeToLogicalType(inputType) match {
-      case t: RowType => t.getChildren.toArray(Array[LogicalType]())
-      case t => Array(t)
-    }
-  }
-
-  /**
-    * Validate if class represented by the typeInfo is static and globally accessible
-    * @param dataType type to check
-    * @throws TableException if type does not meet these criteria
-    */
-  def validateType(dataType: DataType): Unit = {
-    var clazz = dataType.getConversionClass
-    if (clazz == null) {
-      clazz = ClassLogicalTypeConverter.getDefaultExternalClassForType(dataType.getLogicalType)
-    }
-    if ((clazz.isMemberClass && !Modifier.isStatic(clazz.getModifiers)) ||
-      !Modifier.isPublic(clazz.getModifiers) ||
-      clazz.getCanonicalName == null) {
-      throw new TableException(
-        s"Class '$clazz' described in type information '$dataType' must be " +
-          s"static and globally accessible.")
-    }
-  }
-
-  /**
-    * Returns field names and field positions for a given [[DataType]].
-    *
-    * @param inputType The DataType extract the field names and positions from.
-    * @tparam A The type of the DataType.
-    * @return A tuple of two arrays holding the field names and corresponding field positions.
-    */
-  protected[flink] def getFieldInfo[A](
-      inputType: DataType): (Array[String], Array[Int]) = {
-
-    val logicalType = fromDataTypeToLogicalType(inputType)
-    logicalType match {
-      case value: TypeInformationAnyType[A]
-        if value.getTypeInformation.getTypeClass == classOf[Row] =>
-        throw new TableException(
-          "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
-            "Please specify the type of the input with a RowTypeInfo.")
-      case _ =>
-        (getFieldNames(inputType), getFieldIndices(inputType))
-    }
-  }
-
-  /**
-    * Returns field names and field positions for a given [[DataType]] and [[Array]] of
-    * field names. It does not handle time attributes.
-    *
-    * @param inputType The [[DataType]] against which the field names are referenced.
-    * @param fields The fields that define the field names.
-    * @tparam A The type of the DataType.
-    * @return A tuple of two arrays holding the field names and corresponding field positions.
-    */
-  // TODO: we should support Expression fields after we introduce [Expression]
-  // TODO remove this method and use FieldInfoUtils#getFieldsInfo
-  protected[flink] def getFieldInfo[A](
-      inputType: DataType,
-      fields: Array[String]): (Array[String], Array[Int]) = {
-
-    validateType(inputType)
-
-    def referenceByName(name: String, ct: RowType): Option[Int] = {
-      val inputIdx = ct.getFieldIndex(name)
-      if (inputIdx < 0) {
-        throw new TableException(s"$name is not a field of type $ct. " +
-          s"Expected: ${ct.getFieldNames.mkString(", ")}")
-      } else {
-        Some(inputIdx)
-      }
-    }
-
-    val indexedNames: Array[(Int, String)] = fromDataTypeToLogicalType(inputType) match {
-
-      case g: TypeInformationAnyType[A]
-        if g.getTypeInformation.getTypeClass == classOf[Row] ||
-          g.getTypeInformation.getTypeClass == classOf[BaseRow] =>
-        throw new TableException(
-          "An input of GenericTypeInfo<Row> cannot be converted to Table. " +
-            "Please specify the type of the input with a RowTypeInfo.")
-
-      case t: RowType =>
-
-        // determine schema definition mode (by position or by name)
-        val isRefByPos = isReferenceByPosition(t, fields)
-
-        fields.zipWithIndex flatMap { case (name, idx) =>
-          if (name.endsWith("rowtime") || name.endsWith("proctime")) {
-            None
-          } else {
-            if (isRefByPos) {
-              Some((idx, name))
-            } else {
-              referenceByName(name, t).map((_, name))
-            }
-          }
-        }
-
-      case _ => // atomic or other custom type information
-        if (fields.length > 1) {
-          // only accept the first field for an atomic type
-          throw new TableException("Only accept one field to reference an atomic type.")
-        }
-        // first field reference is mapped to atomic type
-        Array((0, fields(0)))
-    }
-
-    val (fieldIndexes, fieldNames) = indexedNames.unzip
-
-    if (fieldNames.contains("*")) {
-      throw new TableException("Field name can not be '*'.")
-    }
-
-    (fieldNames, fieldIndexes)
-  }
-
-  /**
-    * Reference input fields by name:
-    * All fields in the schema definition are referenced by name
-    * (and possibly renamed using an alias (as). In this mode, fields can be reordered and
-    * projected out. Moreover, we can define proctime and rowtime attributes at arbitrary
-    * positions using arbitrary names (except those that exist in the result schema). This mode
-    * can be used for any input type, including POJOs.
-    *
-    * Reference input fields by position:
-    * In this mode, fields are simply renamed. Event-time attributes can
-    * replace the field on their position in the input data (if it is of correct type) or be
-    * appended at the end. Proctime attributes must be appended at the end. This mode can only be
-    * used if the input type has a defined field order (tuple, case class, Row) and no of fields
-    * references a field of the input type.
-    */
-  // TODO: we should support Expression fields after we introduce [Expression]
-  protected def isReferenceByPosition(ct: RowType, fields: Array[String]): Boolean = {
-    val inputNames = ct.getFieldNames
-
-    // Use the by-position mode if no of the fields exists in the input.
-    // This prevents confusing cases like ('f2, 'f0, 'myName) for a Tuple3 where fields are renamed
-    // by position but the user might assume reordering instead of renaming.
-    fields.forall(!inputNames.contains(_))
-  }
-
-}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
index 0f5b954..61132be 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/table/CorrelateITCase.scala
@@ -35,8 +35,6 @@ import java.sql.{Date, Timestamp}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-// TODO Enable after TypedFlinkTableFunction take fieldNames into consideration
-@Ignore
 class CorrelateITCase extends BatchTestBase {
 
   @Test
@@ -179,6 +177,8 @@ class CorrelateITCase extends BatchTestBase {
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  // TODO
+  @Ignore("Type question, should be fixed later.")
   @Test
   def testUserDefinedTableFunctionWithScalarFunctionInCondition(): Unit = {
     val in = testData.as('a, 'b, 'c)
@@ -291,6 +291,8 @@ class CorrelateITCase extends BatchTestBase {
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
+  // TODO
+  @Ignore("Add a rule to translate a Correlate without correlateSets to Join!")
   @Test
   def testTableFunctionWithVariableArguments(): Unit = {
     val varArgsFunc0 = new VarArgsFunc0
@@ -351,21 +353,8 @@ class CorrelateITCase extends BatchTestBase {
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-//  @Test
-//  def testCorrelateAfterConcatAggWithConstantParam(): Unit = {
-//    val in = testData.as('a, 'b, 'c)
-//    val in2 = testData.as('a, 'b, 'c)
-//    val func0 = new TableFunc0
-//    val left = in.select('c.concat_agg("#") as 'd)
-//    val result = in2.join(left).as('a, 'b, 'c, 'd)
-//      .joinLateral(func0('c) as ('name, 'age))
-//      .select('a, 'c, 'name, 'age)
-//
-//    val results = executeQuery(result)
-//    val expected = "1,Jack#22,Jack,22\n2,John#19,John,19\n3,Anna#44,Anna,44"
-//    TestBaseUtils.compareResultAsText(results.asJava, expected)
-//  }
-
+  // TODO
+  @Ignore("Type question, should be fixed later.")
   @Test
   def testTableFunctionCollectorOpenClose(): Unit = {
     val t = testData.as('a, 'b, 'c)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
new file mode 100644
index 0000000..a42e255
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/table/CorrelateITCase.scala
@@ -0,0 +1,364 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.expressions.utils.{Func18, FuncWithOpen, RichFunc2}
+import org.apache.flink.table.runtime.utils.StreamingWithStateTestBase.StateBackendMode
+import org.apache.flink.table.runtime.utils.TestData._
+import org.apache.flink.table.runtime.utils._
+import org.apache.flink.table.util.{PojoTableFunc, RF, RichTableFunc1, TableFunc0, TableFunc2, TableFunc3, TableFunc6, TableFunc7, VarArgsFunc0}
+import org.apache.flink.types.Row
+
+import org.junit.Assert._
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import java.lang.{Boolean => JBoolean}
+
+import scala.collection.mutable
+
+@RunWith(classOf[Parameterized])
+class CorrelateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase(mode) {
+
+  @Test
+  def testCrossJoin(): Unit = {
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+    val pojoFunc0 = new PojoTableFunc()
+
+    val result = t
+      .joinLateral(func0('c) as('d, 'e))
+      .select('c, 'd, 'e)
+      .joinLateral(pojoFunc0('c))
+      .where('age > 20)
+      .select('c, 'name, 'age)
+      .toAppendStream[Row]
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList("Jack#22,Jack,22", "Anna#44,Anna,44")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testLeftOuterJoinWithoutPredicates(): Unit = {
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+
+    val result = t
+      .leftOuterJoinLateral(func0('c) as('d, 'e))
+      .select('c, 'd, 'e)
+      .toAppendStream[Row]
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "nosharp,null,null", "Jack#22,Jack,22",
+      "John#19,John,19", "Anna#44,Anna,44")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  /**
+    * Common join predicates are temporarily forbidden (see FLINK-7865).
+    */
+  @Test (expected = classOf[ValidationException])
+  def testLeftOuterJoinWithPredicates(): Unit = {
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+
+    val result = t
+      .leftOuterJoinLateral(func0('c) as ('s, 'l), 'a === 'l)
+      .select('c, 's, 'l)
+      .toAppendStream[Row]
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = "John#19,null,null\n" + "John#22,null,null\n" + "Anna44,null,null\n" +
+      "nosharp,null,null"
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testUserDefinedTableFunctionWithScalarFunction(): Unit = {
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+
+    val result = t
+      .joinLateral(func0('c) as('d, 'e))
+      .where(Func18('d, "J"))
+      .select('c, 'd, 'e)
+      .toAppendStream[Row]
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList("Jack#22,Jack,22", "John#19,John,19")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testUserDefinedTableFunctionWithParameter(): Unit = {
+    val tableFunc1 = new RichTableFunc1
+    tEnv.registerFunction("RichTableFunc1", tableFunc1)
+    UserDefinedFunctionTestUtils.setJobParameters(env, Map("word_separator" -> " "))
+
+    val result = failingDataSource(smallTupleData3)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .joinLateral(tableFunc1('c) as 's)
+      .select('a, 's)
+
+    val sink = new TestingAppendSink
+    result.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList("3,Hello", "3,world")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testUserDefinedTableFunctionWithUserDefinedScalarFunction(): Unit = {
+    val tableFunc1 = new RichTableFunc1
+    val richFunc2 = new RichFunc2
+    tEnv.registerFunction("RichTableFunc1", tableFunc1)
+    tEnv.registerFunction("RichFunc2", richFunc2)
+    UserDefinedFunctionTestUtils.setJobParameters(
+      env,
+      Map("word_separator" -> "#", "string.value" -> "test"))
+
+    val result = failingDataSource(smallTupleData3)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .joinLateral(tableFunc1(richFunc2('c)) as 's)
+      .select('a, 's)
+
+    val sink = new TestingAppendSink
+    result.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,Hi",
+      "1,test",
+      "2,Hello",
+      "2,test",
+      "3,Hello world",
+      "3,test")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testTableFunctionConstructorWithParams(): Unit = {
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val config = Map("key1" -> "value1", "key2" -> "value2")
+    val func30 = new TableFunc3(null)
+    val func31 = new TableFunc3("OneConf_")
+    val func32 = new TableFunc3("TwoConf_", config)
+
+    val result = t
+      .joinLateral(func30('c) as('d, 'e))
+      .select('c, 'd, 'e)
+      .joinLateral(func31('c) as ('f, 'g))
+      .select('c, 'd, 'e, 'f, 'g)
+      .joinLateral(func32('c) as ('h, 'i))
+      .select('c, 'd, 'f, 'h, 'e, 'g, 'i)
+      .toAppendStream[Row]
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Anna#44,Anna,OneConf_Anna,TwoConf__key=key1_value=value1_Anna,44,44,44",
+      "Anna#44,Anna,OneConf_Anna,TwoConf__key=key2_value=value2_Anna,44,44,44",
+      "Jack#22,Jack,OneConf_Jack,TwoConf__key=key1_value=value1_Jack,22,22,22",
+      "Jack#22,Jack,OneConf_Jack,TwoConf__key=key2_value=value2_Jack,22,22,22",
+      "John#19,John,OneConf_John,TwoConf__key=key1_value=value1_John,19,19,19",
+      "John#19,John,OneConf_John,TwoConf__key=key2_value=value2_John,19,19,19"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testTableFunctionWithVariableArguments(): Unit = {
+    val varArgsFunc0 = new VarArgsFunc0
+    tEnv.registerFunction("VarArgsFunc0", varArgsFunc0)
+
+    val result = testData(env)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .select('c)
+      .joinLateral(varArgsFunc0("1", "2", 'c))
+
+    val sink = new TestingAppendSink
+    result.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Anna#44,1",
+      "Anna#44,2",
+      "Anna#44,Anna#44",
+      "Jack#22,1",
+      "Jack#22,2",
+      "Jack#22,Jack#22",
+      "John#19,1",
+      "John#19,2",
+      "John#19,John#19",
+      "nosharp,1",
+      "nosharp,2",
+      "nosharp,nosharp")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowType(): Unit = {
+    val row = Row.of(
+      12.asInstanceOf[Integer],
+      true.asInstanceOf[JBoolean],
+      Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer], 3.asInstanceOf[Integer])
+    )
+
+    val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT))
+    val in = env.fromElements(row, row)(rowType).toTable(tEnv).as('a, 'b, 'c)
+
+    val tableFunc = new TableFunc6()
+    val result = in
+      .joinLateral(tableFunc('c) as ('f0, 'f1, 'f2))
+      .select('c, 'f2)
+
+    val sink = new TestingAppendSink
+    result.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,2,3,3",
+      "1,2,3,3")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testTableFunctionCollectorOpenClose(): Unit = {
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+    val func26 = new FuncWithOpen
+    tEnv.registerFunction("func26", func26)
+    val result = t
+      .joinLateral(func0('c) as('d, 'e))
+      .where(func26('e))
+      .select('c, 'd, 'e)
+      .toAppendStream[Row]
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    val expected = Seq (
+      "Jack#22,Jack,22",
+      "John#19,John,19",
+      "Anna#44,Anna,44"
+    )
+
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testTableFunctionCollectorInit(): Unit = {
+    val t = testData(env).toTable(tEnv).as('a, 'b, 'c)
+    val func0 = new TableFunc0
+
+    // this case will generate 'timestamp' member field and 'DateFormatter'
+    val result = t
+      .joinLateral(func0('c) as('d, 'e))
+      .where(dateFormat(currentTimestamp(), "yyyyMMdd") === 'd)
+      .select('c, 'd, 'e)
+      .toAppendStream[Row]
+
+    val sink = new TestingAppendSink
+    result.addSink(sink)
+    env.execute()
+
+    assertTrue(sink.getAppendResults.isEmpty)
+  }
+
+  @Test
+  def testFlatMap(): Unit = {
+    val func2 = new TableFunc2
+    val ds = testData(env).toTable(tEnv, 'a, 'b, 'c)
+      // test non alias
+      .flatMap(func2('c))
+      .select('f0, 'f1)
+      // test the output field name of flatMap is the same as the field name of the input table
+      .flatMap(func2(concat('f0, "#")))
+      .as ('f0, 'f1)
+      .select('f0, 'f1)
+
+    val sink = new TestingAppendSink
+    ds.toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "Jack,4",
+      "22,2",
+      "John,4",
+      "19,2",
+      "Anna,4",
+      "44,2")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testMultipleEvals(): Unit = {
+    val rf = new RF
+    val tf = new TableFunc7
+
+    val row = Row.of(
+      12.asInstanceOf[Integer],
+      true.asInstanceOf[JBoolean],
+      Row.of(1.asInstanceOf[Integer], 2.asInstanceOf[Integer], 3.asInstanceOf[Integer])
+    )
+
+    val rowType = Types.ROW(Types.INT, Types.BOOLEAN, Types.ROW(Types.INT, Types.INT, Types.INT))
+    val in = env.fromElements(row, row)(rowType).toTable(tEnv).as('a, 'b, 'c)
+    val result = in.select(rf('a) as 'd).joinLateral(tf('d) as 'e)
+
+    val sink = new TestingAppendSink
+
+    result.toAppendStream[Row].addSink(sink)
+    env.execute()
+  }
+
+  private def testData(
+      env: StreamExecutionEnvironment)
+    : DataStream[(Int, Long, String)] = {
+
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "Jack#22"))
+    data.+=((2, 2L, "John#19"))
+    data.+=((3, 2L, "Anna#44"))
+    data.+=((4, 3L, "nosharp"))
+    env.fromCollection(data)
+  }
+}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
index a59622f..fcbc4c3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/util/TableTestBase.scala
@@ -18,9 +18,8 @@
 package org.apache.flink.table.util
 
 import org.apache.flink.api.common.JobExecutionResult
-import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation}
-import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
-import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, StreamExecutionEnvironment}
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment => ScalaStreamExecEnv}
@@ -50,6 +49,7 @@ import org.apache.flink.table.sinks._
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.table.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo
 import org.apache.flink.table.types.logical.LogicalType
+import org.apache.flink.table.typeutils.FieldInfoUtils
 import org.apache.flink.types.Row
 
 import org.apache.calcite.rel.RelNode
@@ -171,13 +171,7 @@ abstract class TableTestUtilBase(test: TableTestBase, isBatch: Boolean) {
     */
   def addTableSource[T: TypeInformation](name: String, fields: Expression*): Table = {
     val typeInfo: TypeInformation[T] = implicitly[TypeInformation[T]]
-    val fieldTypes: Array[TypeInformation[_]] = typeInfo match {
-      case tt: TupleTypeInfo[_] => (0 until tt.getArity).map(tt.getTypeAt).toArray
-      case ct: CaseClassTypeInfo[_] => (0 until ct.getArity).map(ct.getTypeAt).toArray
-      case at: AtomicType[_] => Array[TypeInformation[_]](at)
-      case _ => throw new TableException(s"Unsupported type info: $typeInfo")
-    }
-    val fieldsInfo = org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(
+    val fieldsInfo = FieldInfoUtils.getFieldsInfo(
       typeInfo, fields.toArray)
     addTableSource(name, new TestTableSource(isBatch, fieldsInfo.toTableSchema))
   }
@@ -1011,7 +1005,7 @@ object TableTestUtil {
     val streamType = dataStream.getType
     // get field names and types for all non-replaced fields
     val typeInfoSchema = fields.map((f: Array[Expression]) => {
-      val fieldsInfo = org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(streamType, f)
+      val fieldsInfo = FieldInfoUtils.getFieldsInfo(streamType, f)
       // check if event-time is enabled
       if (fieldsInfo.isRowtimeDefined &&
         (execEnv.getStreamTimeCharacteristic ne TimeCharacteristic.EventTime)) {
@@ -1021,7 +1015,7 @@ object TableTestUtil {
           execEnv.getStreamTimeCharacteristic))
       }
       fieldsInfo
-    }).getOrElse(org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo(streamType))
+    }).getOrElse(FieldInfoUtils.getFieldsInfo(streamType))
 
     val fieldCnt = typeInfoSchema.getFieldTypes.length
     val dataStreamQueryOperation = new DataStreamQueryOperation(