You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/08/07 15:51:18 UTC

[flink] 02/05: [FLINK-13225][table-planner-blink] Fix type inference for hive udtf

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2c110254b6a0b709ca41fe9c819298516dccbc5
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Sun Jul 28 20:21:20 2019 +0800

    [FLINK-13225][table-planner-blink] Fix type inference for hive udtf
---
 .../catalog/FunctionCatalogOperatorTable.java      |  21 +-
 .../functions/utils/HiveTableSqlFunction.java      | 235 +++++++++++++++++++++
 .../planner/plan/QueryOperationConverter.java      |   3 +-
 .../planner/codegen/CorrelateCodeGenerator.scala   |   8 +-
 .../table/planner/codegen/ExprCodeGenerator.scala  |   4 +-
 .../planner/functions/utils/TableSqlFunction.scala | 112 +++++-----
 .../functions/utils/UserDefinedFunctionUtils.scala |   2 +-
 .../logical/FlinkLogicalTableFunctionScan.scala    |   2 +-
 ...relateToJoinFromTemporalTableFunctionRule.scala |   4 +-
 .../schema/DeferredTypeFlinkTableFunction.scala    |   4 +-
 .../planner/plan/schema/FlinkTableFunction.scala   |   1 +
 .../plan/schema/TypedFlinkTableFunction.scala      |   2 +
 .../utils/UserDefinedFunctionTestUtils.scala       |   2 +-
 13 files changed, 336 insertions(+), 64 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
index ddf8f60..bc60f27 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.planner.catalog;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.FunctionLookup;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
@@ -27,8 +28,12 @@ import org.apache.flink.table.functions.ScalarFunctionDefinition;
 import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction;
+import org.apache.flink.table.planner.functions.utils.HiveTableSqlFunction;
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
+import org.apache.flink.table.planner.plan.schema.DeferredTypeFlinkTableFunction;
+import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
 
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
@@ -42,6 +47,7 @@ import java.util.List;
 import java.util.Optional;
 
 import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.isHiveFunc;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
  * Thin adapter between {@link SqlOperatorTable} and {@link FunctionCatalog}.
@@ -108,7 +114,20 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
 		} else if (functionDefinition instanceof TableFunctionDefinition &&
 				category != null &&
 				category.isTableFunction()) {
-			return convertTableFunction(name, (TableFunctionDefinition) functionDefinition);
+			TableFunctionDefinition def = (TableFunctionDefinition) functionDefinition;
+			if (isHiveFunc(def.getTableFunction())) {
+				DataType returnType = fromLegacyInfoToDataType(new GenericTypeInfo<>(Row.class));
+				return Optional.of(new HiveTableSqlFunction(
+						name,
+						name,
+						def.getTableFunction(),
+						returnType,
+						typeFactory,
+						new DeferredTypeFlinkTableFunction(def.getTableFunction(), returnType),
+						HiveTableSqlFunction.operandTypeChecker(name, def.getTableFunction())));
+			} else {
+				return convertTableFunction(name, (TableFunctionDefinition) functionDefinition);
+			}
 		}
 
 		return Optional.empty();
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java
new file mode 100644
index 0000000..6800fb9
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.utils;
+
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.FlinkTableFunction;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.util.BitString;
+import org.apache.calcite.util.DateString;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.TimeString;
+import org.apache.calcite.util.TimestampString;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Tuple3;
+
+import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType;
+import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeSetArgs;
+import static org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.buildRelDataType;
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
+
+/**
+ * Hive {@link TableSqlFunction}.
+ * Override getFunction to clone function and invoke {@code HiveGenericUDTF#setArgumentTypesAndConstants}.
+ * Override SqlReturnTypeInference to invoke {@code HiveGenericUDTF#getHiveResultType} instead of
+ * {@code HiveGenericUDTF#getResultType(Object[], Class[])}.
+ *
+ * @deprecated TODO hack code, its logical should be integrated to TableSqlFunction
+ */
+@Deprecated
+public class HiveTableSqlFunction extends TableSqlFunction {
+
+	private final TableFunction hiveUdtf;
+	private final HiveOperandTypeChecker operandTypeChecker;
+
+	public HiveTableSqlFunction(String name, String displayName,
+			TableFunction hiveUdtf,
+			DataType implicitResultType,
+			FlinkTypeFactory typeFactory,
+			FlinkTableFunction functionImpl,
+			HiveOperandTypeChecker operandTypeChecker) {
+		super(name, displayName, hiveUdtf, implicitResultType, typeFactory, functionImpl, scala.Option.apply(operandTypeChecker));
+		this.hiveUdtf = hiveUdtf;
+		this.operandTypeChecker = operandTypeChecker;
+	}
+
+	@Override
+	public TableFunction makeFunction(Object[] constantArguments, LogicalType[] argTypes) {
+		TableFunction clone;
+		try {
+			clone = InstantiationUtil.clone(hiveUdtf);
+		} catch (IOException | ClassNotFoundException e) {
+			throw new RuntimeException(e);
+		}
+		return (TableFunction) invokeSetArgs(clone, constantArguments, argTypes);
+	}
+
+	@Override
+	public RelDataType getRowType(RelDataTypeFactory typeFactory, List<SqlNode> operandList) {
+		Preconditions.checkNotNull(operandTypeChecker.previousArgTypes);
+		FlinkTypeFactory factory = (FlinkTypeFactory) typeFactory;
+		Object[] arguments = convertArguments(
+				Arrays.stream(operandTypeChecker.previousArgTypes)
+						.map(factory::createFieldTypeFromLogicalType)
+						.collect(Collectors.toList()),
+				operandList);
+		DataType resultType = fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(
+				invokeGetResultType(hiveUdtf, arguments, operandTypeChecker.previousArgTypes, (FlinkTypeFactory) typeFactory)));
+		Tuple3<String[], int[], LogicalType[]> fieldInfo = UserDefinedFunctionUtils.getFieldInfo(resultType);
+		return buildRelDataType(typeFactory, fromDataTypeToLogicalType(resultType), fieldInfo._1(), fieldInfo._2());
+	}
+
+	/**
+	 * This method is copied from calcite, and modify it to not rely on Function.
+	 * TODO FlinkTableFunction need implement getElementType.
+	 */
+	private static Object[] convertArguments(
+			List<RelDataType> operandTypes,
+			List<SqlNode> operandList) {
+		List<Object> arguments = new ArrayList<>(operandList.size());
+		// Construct a list of arguments, if they are all constants.
+		for (Pair<RelDataType, SqlNode> pair
+				: Pair.zip(operandTypes, operandList)) {
+			try {
+				final Object o = getValue(pair.right);
+				final Object o2 = coerce(o, pair.left);
+				arguments.add(o2);
+			} catch (NonLiteralException e) {
+				arguments.add(null);
+			}
+		}
+		return arguments.toArray();
+	}
+
+	private static Object coerce(Object value, RelDataType type) {
+		if (value == null) {
+			return null;
+		}
+		switch (type.getSqlTypeName()) {
+			case CHAR:
+				return ((NlsString) value).getValue();
+			case BINARY:
+				return ((BitString) value).getAsByteArray();
+			case DECIMAL:
+				return value;
+			case BIGINT:
+				return ((BigDecimal) value).longValue();
+			case INTEGER:
+				return ((BigDecimal) value).intValue();
+			case SMALLINT:
+				return ((BigDecimal) value).shortValue();
+			case TINYINT:
+				return ((BigDecimal) value).byteValue();
+			case DOUBLE:
+				return ((BigDecimal) value).doubleValue();
+			case REAL:
+				return ((BigDecimal) value).floatValue();
+			case FLOAT:
+				return ((BigDecimal) value).floatValue();
+			case DATE:
+				return LocalDate.ofEpochDay(((DateString) value).getDaysSinceEpoch());
+			case TIME:
+				return LocalTime.ofNanoOfDay(((TimeString) value).getMillisOfDay() * 1000_000);
+			case TIMESTAMP:
+				return SqlDateTimeUtils.unixTimestampToLocalDateTime(((TimestampString) value).getMillisSinceEpoch());
+			default:
+				throw new RuntimeException("Not support type: " + type);
+		}
+	}
+
+	private static Object getValue(SqlNode right) throws NonLiteralException {
+		switch (right.getKind()) {
+			case ARRAY_VALUE_CONSTRUCTOR:
+				final List<Object> list = new ArrayList<>();
+				for (SqlNode o : ((SqlCall) right).getOperandList()) {
+					list.add(getValue(o));
+				}
+				return ImmutableNullableList.copyOf(list).toArray();
+			case MAP_VALUE_CONSTRUCTOR:
+				final Map<Object, Object> map = new HashMap<>();
+				final List<SqlNode> operands = ((SqlCall) right).getOperandList();
+				for (int i = 0; i < operands.size(); i += 2) {
+					final SqlNode key = operands.get(i);
+					final SqlNode value = operands.get(i + 1);
+					map.put(getValue(key), getValue(value));
+				}
+				return map;
+			default:
+				if (SqlUtil.isNullLiteral(right, true)) {
+					return null;
+				}
+				if (SqlUtil.isLiteral(right)) {
+					return ((SqlLiteral) right).getValue();
+				}
+				if (right.getKind() == SqlKind.DEFAULT) {
+					return null; // currently NULL is the only default value
+				}
+				throw new NonLiteralException();
+		}
+	}
+
+	/** Thrown when a non-literal occurs in an argument to a user-defined
+	 * table macro. */
+	private static class NonLiteralException extends Exception {
+	}
+
+	public static HiveOperandTypeChecker operandTypeChecker(String name, TableFunction udtf) {
+		return new HiveOperandTypeChecker(name, udtf, UserDefinedFunctionUtils.checkAndExtractMethods(udtf, "eval"));
+	}
+
+	/**
+	 * Checker for remember previousArgTypes.
+	 *
+	 * @deprecated TODO hack code, should modify calcite getRowType to pass operand types
+	 */
+	@Deprecated
+	public static class HiveOperandTypeChecker extends OperandTypeChecker {
+
+		private LogicalType[] previousArgTypes;
+
+		private HiveOperandTypeChecker(String name, TableFunction udtf, Method[] methods) {
+			super(name, udtf, methods);
+		}
+
+		@Override
+		public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
+			previousArgTypes = UserDefinedFunctionUtils.getOperandTypeArray(callBinding);
+			return super.checkOperandTypes(callBinding, throwOnFailure);
+		}
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
index 2a2119d..e0f4763 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
@@ -279,7 +279,8 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod
 					tableFunction,
 					resultType,
 					typeFactory,
-					function);
+					function,
+					scala.Option.empty());
 
 			List<RexNode> parameters = convertToRexNodes(calculatedTable.getParameters());
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index cf861bf..c292c64 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -69,15 +69,19 @@ object CorrelateCodeGenerator {
     val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
     // we need result Type to do code generation
     val arguments = UserDefinedFunctionUtils.transformRexNodes(rexCall.operands)
+    val operandTypes = rexCall.operands
+        .map(_.getType)
+        .map(FlinkTypeFactory.toLogicalType).toArray
+    val func = sqlFunction.makeFunction(arguments, operandTypes)
     val argTypes = getEvalMethodSignature(
-      sqlFunction.getTableFunction,
+      func,
       rexCall.operands
         .map(_.getType)
         .map(FlinkTypeFactory.toLogicalType).toArray)
     val udtfExternalType = sqlFunction
         .getFunction
         .asInstanceOf[FlinkTableFunction]
-        .getExternalResultType(arguments, argTypes)
+        .getExternalResultType(func, arguments, argTypes)
     val pojoFieldMapping = Some(UserDefinedFunctionUtils.getFieldInfo(udtfExternalType)._2)
     val inputType = FlinkTypeFactory.toLogicalRowType(inputRelType)
     val (returnType, swallowInputOnly ) = if (projectProgram.isDefined) {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 7c55d73..3a05fe5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -737,7 +737,9 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
             .generate(ctx, operands, resultType)
 
       case tsf: TableSqlFunction =>
-        new TableFunctionCallGen(tsf.getTableFunction).generate(ctx, operands, resultType)
+        new TableFunctionCallGen(
+          tsf.makeFunction(getOperandLiterals(operands), operands.map(_.resultType).toArray))
+            .generate(ctx, operands, resultType)
 
       // advanced scalar functions
       case sqlOperator: SqlOperator =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
index 4bf554a..c3f5ac3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.planner.plan.schema.FlinkTableFunction
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
 import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.logical.LogicalType
 
 import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
 import org.apache.calcite.sql._
@@ -34,6 +35,7 @@ import org.apache.calcite.sql.`type`._
 import org.apache.calcite.sql.parser.SqlParserPos
 import org.apache.calcite.sql.validate.{SqlUserDefinedTableFunction, SqlUserDefinedTableMacro}
 
+import java.lang.reflect.Method
 import java.util
 
 /**
@@ -49,22 +51,26 @@ import java.util
 class TableSqlFunction(
     name: String,
     displayName: String,
-    udtf: TableFunction[_],
+    val udtf: TableFunction[_],
     implicitResultType: DataType,
     typeFactory: FlinkTypeFactory,
-    functionImpl: FlinkTableFunction)
+    functionImpl: FlinkTableFunction,
+    operandTypeInfer: Option[SqlOperandTypeChecker] = None)
   extends SqlUserDefinedTableFunction(
     new SqlIdentifier(name, SqlParserPos.ZERO),
     ReturnTypes.CURSOR,
+    // type inference has the UNKNOWN operand types.
     createOperandTypeInference(name, udtf, typeFactory),
-    createOperandTypeChecker(name, udtf),
+    // only checker has the real operand types.
+    operandTypeInfer.getOrElse(createOperandTypeChecker(name, udtf)),
     null,
     functionImpl) {
 
   /**
     * Get the user-defined table function.
     */
-  def getTableFunction = udtf
+  def makeFunction(constants: Array[AnyRef], argTypes: Array[LogicalType]): TableFunction[_] =
+    udtf
 
   /**
     * Get the type information of the table returned by the table function.
@@ -131,61 +137,61 @@ object TableSqlFunction {
   private[flink] def createOperandTypeChecker(
       name: String,
       udtf: TableFunction[_]): SqlOperandTypeChecker = {
+    new OperandTypeChecker(name, udtf, checkAndExtractMethods(udtf, "eval"))
+  }
+}
 
-    val methods = checkAndExtractMethods(udtf, "eval")
+/**
+  * Operand type checker based on [[TableFunction]] given information.
+  */
+class OperandTypeChecker(
+    name: String, udtf: TableFunction[_], methods: Array[Method]) extends SqlOperandTypeChecker {
 
-    /**
-      * Operand type checker based on [[TableFunction]] given information.
-      */
-    new SqlOperandTypeChecker {
-      override def getAllowedSignatures(op: SqlOperator, opName: String): String = {
-        s"$opName[${signaturesToString(udtf, "eval")}]"
-      }
+  override def getAllowedSignatures(op: SqlOperator, opName: String): String = {
+    s"$opName[${signaturesToString(udtf, "eval")}]"
+  }
 
-      override def getOperandCountRange: SqlOperandCountRange = {
-        var min = 254
-        var max = -1
-        var isVarargs = false
-        methods.foreach(m => {
-          var len = m.getParameterTypes.length
-          if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) {
-            isVarargs = true
-            len = len - 1
-          }
-          max = Math.max(len, max)
-          min = Math.min(len, min)
-        })
-        if (isVarargs) {
-          // if eval method is varargs, set max to -1 to skip length check in Calcite
-          max = -1
-        }
-        SqlOperandCountRanges.between(min, max)
+  override def getOperandCountRange: SqlOperandCountRange = {
+    var min = 254
+    var max = -1
+    var isVarargs = false
+    methods.foreach(m => {
+      var len = m.getParameterTypes.length
+      if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) {
+        isVarargs = true
+        len = len - 1
       }
+      max = Math.max(len, max)
+      min = Math.min(len, min)
+    })
+    if (isVarargs) {
+      // if eval method is varargs, set max to -1 to skip length check in Calcite
+      max = -1
+    }
+    SqlOperandCountRanges.between(min, max)
+  }
 
-      override def checkOperandTypes(
-          callBinding: SqlCallBinding,
-          throwOnFailure: Boolean)
-        : Boolean = {
-        val operandTypes = getOperandType(callBinding)
-
-        if (getEvalUserDefinedMethod(udtf, operandTypes).isEmpty) {
-          if (throwOnFailure) {
-            throw new ValidationException(
-              s"Given parameters of function '$name' do not match any signature. \n" +
-                  s"Actual: ${signatureInternalToString(operandTypes)} \n" +
-                  s"Expected: ${signaturesToString(udtf, "eval")}")
-          } else {
-            false
-          }
-        } else {
-          true
-        }
+  override def checkOperandTypes(
+      callBinding: SqlCallBinding,
+      throwOnFailure: Boolean)
+  : Boolean = {
+    val operandTypes = getOperandType(callBinding)
+
+    if (getEvalUserDefinedMethod(udtf, operandTypes).isEmpty) {
+      if (throwOnFailure) {
+        throw new ValidationException(
+          s"Given parameters of function '$name' do not match any signature. \n" +
+              s"Actual: ${signatureInternalToString(operandTypes)} \n" +
+              s"Expected: ${signaturesToString(udtf, "eval")}")
+      } else {
+        false
       }
-
-      override def isOptional(i: Int): Boolean = false
-
-      override def getConsistency: Consistency = Consistency.NONE
-
+    } else {
+      true
     }
   }
+
+  override def isOptional(i: Int): Boolean = false
+
+  override def getConsistency: Consistency = Consistency.NONE
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
index e565a6e..3552a7f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
@@ -802,7 +802,7 @@ object UserDefinedFunctionUtils {
     }.toArray
   }
 
-  private[table] def buildRelDataType(
+  def buildRelDataType(
       typeFactory: RelDataTypeFactory,
       resultType: LogicalType,
       fieldNames: Array[String],
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
index e2ab608..38f4b03 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
@@ -100,7 +100,7 @@ class FlinkLogicalTableFunctionScanConverter
       return false
     }
     val tableFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
-    tableFunction.getTableFunction.isInstanceOf[TemporalTableFunction]
+    tableFunction.udtf.isInstanceOf[TemporalTableFunction]
   }
 
   def convert(rel: RelNode): RelNode = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
index a240b64..62a4872 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
@@ -182,11 +182,11 @@ class GetTemporalTableFunctionCall(
     }
     val tableFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
 
-    if (!tableFunction.getTableFunction.isInstanceOf[TemporalTableFunction]) {
+    if (!tableFunction.udtf.isInstanceOf[TemporalTableFunction]) {
       return null
     }
     val temporalTableFunction =
-      tableFunction.getTableFunction.asInstanceOf[TemporalTableFunctionImpl]
+      tableFunction.udtf.asInstanceOf[TemporalTableFunctionImpl]
 
     checkState(
       rexCall.getOperands.size().equals(1),
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
index 9bf8221..6f467d4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.schema
 
+import org.apache.flink.table.functions
 import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
@@ -39,6 +40,7 @@ class DeferredTypeFlinkTableFunction(
   extends FlinkTableFunction(tableFunction) {
 
   override def getExternalResultType(
+      tableFunction: functions.TableFunction[_],
       arguments: Array[AnyRef],
       argTypes: Array[Class[_]]): DataType = {
     // TODO
@@ -56,7 +58,7 @@ class DeferredTypeFlinkTableFunction(
       typeFactory: RelDataTypeFactory,
       arguments: Array[AnyRef],
       argTypes: Array[Class[_]]): RelDataType = {
-    val resultType = getExternalResultType(arguments, argTypes)
+    val resultType = getExternalResultType(tableFunction, arguments, argTypes)
     val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType)
     UserDefinedFunctionUtils.buildRelDataType(
       typeFactory, fromDataTypeToLogicalType(resultType), fieldNames, fieldIndexes)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala
index 73f5e63..6ecb8e7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala
@@ -51,6 +51,7 @@ abstract class FlinkTableFunction(
     * Returns the Type for usage, i.e. code generation.
     */
   def getExternalResultType(
+      tableFunction: functions.TableFunction[_],
       arguments: Array[AnyRef],
       argTypes: Array[Class[_]]): DataType
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
index 773af90..828a4b6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.plan.schema
 
+import org.apache.flink.table.functions
 import org.apache.flink.table.functions.TableFunction
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
@@ -41,6 +42,7 @@ class TypedFlinkTableFunction(
   extends FlinkTableFunction(tableFunction) {
 
   override def getExternalResultType(
+      tableFunction: functions.TableFunction[_],
       arguments: Array[AnyRef],
       argTypes: Array[Class[_]]): DataType =
     externalResultType
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
index 432f0a2..bd69126 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
@@ -334,7 +334,7 @@ object UserDefinedFunctionTestUtils {
       }
       a + b
     }
-    
+
     def eval(a: Long, b: Int): Long = {
       eval(a, b.asInstanceOf[Long])
     }