You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/08/07 15:51:18 UTC
[flink] 02/05: [FLINK-13225][table-planner-blink] Fix type
inference for hive udtf
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit b2c110254b6a0b709ca41fe9c819298516dccbc5
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Sun Jul 28 20:21:20 2019 +0800
[FLINK-13225][table-planner-blink] Fix type inference for hive udtf
---
.../catalog/FunctionCatalogOperatorTable.java | 21 +-
.../functions/utils/HiveTableSqlFunction.java | 235 +++++++++++++++++++++
.../planner/plan/QueryOperationConverter.java | 3 +-
.../planner/codegen/CorrelateCodeGenerator.scala | 8 +-
.../table/planner/codegen/ExprCodeGenerator.scala | 4 +-
.../planner/functions/utils/TableSqlFunction.scala | 112 +++++-----
.../functions/utils/UserDefinedFunctionUtils.scala | 2 +-
.../logical/FlinkLogicalTableFunctionScan.scala | 2 +-
...relateToJoinFromTemporalTableFunctionRule.scala | 4 +-
.../schema/DeferredTypeFlinkTableFunction.scala | 4 +-
.../planner/plan/schema/FlinkTableFunction.scala | 1 +
.../plan/schema/TypedFlinkTableFunction.scala | 2 +
.../utils/UserDefinedFunctionTestUtils.scala | 2 +-
13 files changed, 336 insertions(+), 64 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
index ddf8f60..bc60f27 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
@@ -19,6 +19,7 @@
package org.apache.flink.table.planner.catalog;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.FunctionLookup;
import org.apache.flink.table.functions.AggregateFunctionDefinition;
@@ -27,8 +28,12 @@ import org.apache.flink.table.functions.ScalarFunctionDefinition;
import org.apache.flink.table.functions.TableFunctionDefinition;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.functions.utils.HiveScalarSqlFunction;
+import org.apache.flink.table.planner.functions.utils.HiveTableSqlFunction;
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
+import org.apache.flink.table.planner.plan.schema.DeferredTypeFlinkTableFunction;
+import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
+import org.apache.flink.types.Row;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
@@ -42,6 +47,7 @@ import java.util.List;
import java.util.Optional;
import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.isHiveFunc;
+import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
/**
* Thin adapter between {@link SqlOperatorTable} and {@link FunctionCatalog}.
@@ -108,7 +114,20 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
} else if (functionDefinition instanceof TableFunctionDefinition &&
category != null &&
category.isTableFunction()) {
- return convertTableFunction(name, (TableFunctionDefinition) functionDefinition);
+ TableFunctionDefinition def = (TableFunctionDefinition) functionDefinition;
+ if (isHiveFunc(def.getTableFunction())) {
+ DataType returnType = fromLegacyInfoToDataType(new GenericTypeInfo<>(Row.class));
+ return Optional.of(new HiveTableSqlFunction(
+ name,
+ name,
+ def.getTableFunction(),
+ returnType,
+ typeFactory,
+ new DeferredTypeFlinkTableFunction(def.getTableFunction(), returnType),
+ HiveTableSqlFunction.operandTypeChecker(name, def.getTableFunction())));
+ } else {
+ return convertTableFunction(name, (TableFunctionDefinition) functionDefinition);
+ }
}
return Optional.empty();
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java
new file mode 100644
index 0000000..6800fb9
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions.utils;
+
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.plan.schema.FlinkTableFunction;
+import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.util.BitString;
+import org.apache.calcite.util.DateString;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.TimeString;
+import org.apache.calcite.util.TimestampString;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.time.LocalTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.Tuple3;
+
+import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType;
+import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeSetArgs;
+import static org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.buildRelDataType;
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
+import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
+
+/**
+ * Hive {@link TableSqlFunction}.
+ * Override getFunction to clone function and invoke {@code HiveGenericUDTF#setArgumentTypesAndConstants}.
+ * Override SqlReturnTypeInference to invoke {@code HiveGenericUDTF#getHiveResultType} instead of
+ * {@code HiveGenericUDTF#getResultType(Object[], Class[])}.
+ *
+ * @deprecated TODO hack code, its logical should be integrated to TableSqlFunction
+ */
+@Deprecated
+public class HiveTableSqlFunction extends TableSqlFunction {
+
+ private final TableFunction hiveUdtf;
+ private final HiveOperandTypeChecker operandTypeChecker;
+
+ public HiveTableSqlFunction(String name, String displayName,
+ TableFunction hiveUdtf,
+ DataType implicitResultType,
+ FlinkTypeFactory typeFactory,
+ FlinkTableFunction functionImpl,
+ HiveOperandTypeChecker operandTypeChecker) {
+ super(name, displayName, hiveUdtf, implicitResultType, typeFactory, functionImpl, scala.Option.apply(operandTypeChecker));
+ this.hiveUdtf = hiveUdtf;
+ this.operandTypeChecker = operandTypeChecker;
+ }
+
+ @Override
+ public TableFunction makeFunction(Object[] constantArguments, LogicalType[] argTypes) {
+ TableFunction clone;
+ try {
+ clone = InstantiationUtil.clone(hiveUdtf);
+ } catch (IOException | ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ return (TableFunction) invokeSetArgs(clone, constantArguments, argTypes);
+ }
+
+ @Override
+ public RelDataType getRowType(RelDataTypeFactory typeFactory, List<SqlNode> operandList) {
+ Preconditions.checkNotNull(operandTypeChecker.previousArgTypes);
+ FlinkTypeFactory factory = (FlinkTypeFactory) typeFactory;
+ Object[] arguments = convertArguments(
+ Arrays.stream(operandTypeChecker.previousArgTypes)
+ .map(factory::createFieldTypeFromLogicalType)
+ .collect(Collectors.toList()),
+ operandList);
+ DataType resultType = fromLogicalTypeToDataType(FlinkTypeFactory.toLogicalType(
+ invokeGetResultType(hiveUdtf, arguments, operandTypeChecker.previousArgTypes, (FlinkTypeFactory) typeFactory)));
+ Tuple3<String[], int[], LogicalType[]> fieldInfo = UserDefinedFunctionUtils.getFieldInfo(resultType);
+ return buildRelDataType(typeFactory, fromDataTypeToLogicalType(resultType), fieldInfo._1(), fieldInfo._2());
+ }
+
+ /**
+ * This method is copied from calcite, and modify it to not rely on Function.
+ * TODO FlinkTableFunction need implement getElementType.
+ */
+ private static Object[] convertArguments(
+ List<RelDataType> operandTypes,
+ List<SqlNode> operandList) {
+ List<Object> arguments = new ArrayList<>(operandList.size());
+ // Construct a list of arguments, if they are all constants.
+ for (Pair<RelDataType, SqlNode> pair
+ : Pair.zip(operandTypes, operandList)) {
+ try {
+ final Object o = getValue(pair.right);
+ final Object o2 = coerce(o, pair.left);
+ arguments.add(o2);
+ } catch (NonLiteralException e) {
+ arguments.add(null);
+ }
+ }
+ return arguments.toArray();
+ }
+
+ private static Object coerce(Object value, RelDataType type) {
+ if (value == null) {
+ return null;
+ }
+ switch (type.getSqlTypeName()) {
+ case CHAR:
+ return ((NlsString) value).getValue();
+ case BINARY:
+ return ((BitString) value).getAsByteArray();
+ case DECIMAL:
+ return value;
+ case BIGINT:
+ return ((BigDecimal) value).longValue();
+ case INTEGER:
+ return ((BigDecimal) value).intValue();
+ case SMALLINT:
+ return ((BigDecimal) value).shortValue();
+ case TINYINT:
+ return ((BigDecimal) value).byteValue();
+ case DOUBLE:
+ return ((BigDecimal) value).doubleValue();
+ case REAL:
+ return ((BigDecimal) value).floatValue();
+ case FLOAT:
+ return ((BigDecimal) value).floatValue();
+ case DATE:
+ return LocalDate.ofEpochDay(((DateString) value).getDaysSinceEpoch());
+ case TIME:
+ return LocalTime.ofNanoOfDay(((TimeString) value).getMillisOfDay() * 1000_000);
+ case TIMESTAMP:
+ return SqlDateTimeUtils.unixTimestampToLocalDateTime(((TimestampString) value).getMillisSinceEpoch());
+ default:
+ throw new RuntimeException("Not support type: " + type);
+ }
+ }
+
+ private static Object getValue(SqlNode right) throws NonLiteralException {
+ switch (right.getKind()) {
+ case ARRAY_VALUE_CONSTRUCTOR:
+ final List<Object> list = new ArrayList<>();
+ for (SqlNode o : ((SqlCall) right).getOperandList()) {
+ list.add(getValue(o));
+ }
+ return ImmutableNullableList.copyOf(list).toArray();
+ case MAP_VALUE_CONSTRUCTOR:
+ final Map<Object, Object> map = new HashMap<>();
+ final List<SqlNode> operands = ((SqlCall) right).getOperandList();
+ for (int i = 0; i < operands.size(); i += 2) {
+ final SqlNode key = operands.get(i);
+ final SqlNode value = operands.get(i + 1);
+ map.put(getValue(key), getValue(value));
+ }
+ return map;
+ default:
+ if (SqlUtil.isNullLiteral(right, true)) {
+ return null;
+ }
+ if (SqlUtil.isLiteral(right)) {
+ return ((SqlLiteral) right).getValue();
+ }
+ if (right.getKind() == SqlKind.DEFAULT) {
+ return null; // currently NULL is the only default value
+ }
+ throw new NonLiteralException();
+ }
+ }
+
+ /** Thrown when a non-literal occurs in an argument to a user-defined
+ * table macro. */
+ private static class NonLiteralException extends Exception {
+ }
+
+ public static HiveOperandTypeChecker operandTypeChecker(String name, TableFunction udtf) {
+ return new HiveOperandTypeChecker(name, udtf, UserDefinedFunctionUtils.checkAndExtractMethods(udtf, "eval"));
+ }
+
+ /**
+ * Checker for remember previousArgTypes.
+ *
+ * @deprecated TODO hack code, should modify calcite getRowType to pass operand types
+ */
+ @Deprecated
+ public static class HiveOperandTypeChecker extends OperandTypeChecker {
+
+ private LogicalType[] previousArgTypes;
+
+ private HiveOperandTypeChecker(String name, TableFunction udtf, Method[] methods) {
+ super(name, udtf, methods);
+ }
+
+ @Override
+ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
+ previousArgTypes = UserDefinedFunctionUtils.getOperandTypeArray(callBinding);
+ return super.checkOperandTypes(callBinding, throwOnFailure);
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
index 2a2119d..e0f4763 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
@@ -279,7 +279,8 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod
tableFunction,
resultType,
typeFactory,
- function);
+ function,
+ scala.Option.empty());
List<RexNode> parameters = convertToRexNodes(calculatedTable.getParameters());
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
index cf861bf..c292c64 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
@@ -69,15 +69,19 @@ object CorrelateCodeGenerator {
val sqlFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
// we need result Type to do code generation
val arguments = UserDefinedFunctionUtils.transformRexNodes(rexCall.operands)
+ val operandTypes = rexCall.operands
+ .map(_.getType)
+ .map(FlinkTypeFactory.toLogicalType).toArray
+ val func = sqlFunction.makeFunction(arguments, operandTypes)
val argTypes = getEvalMethodSignature(
- sqlFunction.getTableFunction,
+ func,
rexCall.operands
.map(_.getType)
.map(FlinkTypeFactory.toLogicalType).toArray)
val udtfExternalType = sqlFunction
.getFunction
.asInstanceOf[FlinkTableFunction]
- .getExternalResultType(arguments, argTypes)
+ .getExternalResultType(func, arguments, argTypes)
val pojoFieldMapping = Some(UserDefinedFunctionUtils.getFieldInfo(udtfExternalType)._2)
val inputType = FlinkTypeFactory.toLogicalRowType(inputRelType)
val (returnType, swallowInputOnly ) = if (projectProgram.isDefined) {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
index 7c55d73..3a05fe5 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala
@@ -737,7 +737,9 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
.generate(ctx, operands, resultType)
case tsf: TableSqlFunction =>
- new TableFunctionCallGen(tsf.getTableFunction).generate(ctx, operands, resultType)
+ new TableFunctionCallGen(
+ tsf.makeFunction(getOperandLiterals(operands), operands.map(_.resultType).toArray))
+ .generate(ctx, operands, resultType)
// advanced scalar functions
case sqlOperator: SqlOperator =>
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
index 4bf554a..c3f5ac3 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/TableSqlFunction.scala
@@ -26,6 +26,7 @@ import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.planner.plan.schema.FlinkTableFunction
import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromTypeInfoToLogicalType
import org.apache.flink.table.types.DataType
+import org.apache.flink.table.types.logical.LogicalType
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
import org.apache.calcite.sql._
@@ -34,6 +35,7 @@ import org.apache.calcite.sql.`type`._
import org.apache.calcite.sql.parser.SqlParserPos
import org.apache.calcite.sql.validate.{SqlUserDefinedTableFunction, SqlUserDefinedTableMacro}
+import java.lang.reflect.Method
import java.util
/**
@@ -49,22 +51,26 @@ import java.util
class TableSqlFunction(
name: String,
displayName: String,
- udtf: TableFunction[_],
+ val udtf: TableFunction[_],
implicitResultType: DataType,
typeFactory: FlinkTypeFactory,
- functionImpl: FlinkTableFunction)
+ functionImpl: FlinkTableFunction,
+ operandTypeInfer: Option[SqlOperandTypeChecker] = None)
extends SqlUserDefinedTableFunction(
new SqlIdentifier(name, SqlParserPos.ZERO),
ReturnTypes.CURSOR,
+ // type inference has the UNKNOWN operand types.
createOperandTypeInference(name, udtf, typeFactory),
- createOperandTypeChecker(name, udtf),
+ // only checker has the real operand types.
+ operandTypeInfer.getOrElse(createOperandTypeChecker(name, udtf)),
null,
functionImpl) {
/**
* Get the user-defined table function.
*/
- def getTableFunction = udtf
+ def makeFunction(constants: Array[AnyRef], argTypes: Array[LogicalType]): TableFunction[_] =
+ udtf
/**
* Get the type information of the table returned by the table function.
@@ -131,61 +137,61 @@ object TableSqlFunction {
private[flink] def createOperandTypeChecker(
name: String,
udtf: TableFunction[_]): SqlOperandTypeChecker = {
+ new OperandTypeChecker(name, udtf, checkAndExtractMethods(udtf, "eval"))
+ }
+}
- val methods = checkAndExtractMethods(udtf, "eval")
+/**
+ * Operand type checker based on [[TableFunction]] given information.
+ */
+class OperandTypeChecker(
+ name: String, udtf: TableFunction[_], methods: Array[Method]) extends SqlOperandTypeChecker {
- /**
- * Operand type checker based on [[TableFunction]] given information.
- */
- new SqlOperandTypeChecker {
- override def getAllowedSignatures(op: SqlOperator, opName: String): String = {
- s"$opName[${signaturesToString(udtf, "eval")}]"
- }
+ override def getAllowedSignatures(op: SqlOperator, opName: String): String = {
+ s"$opName[${signaturesToString(udtf, "eval")}]"
+ }
- override def getOperandCountRange: SqlOperandCountRange = {
- var min = 254
- var max = -1
- var isVarargs = false
- methods.foreach(m => {
- var len = m.getParameterTypes.length
- if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) {
- isVarargs = true
- len = len - 1
- }
- max = Math.max(len, max)
- min = Math.min(len, min)
- })
- if (isVarargs) {
- // if eval method is varargs, set max to -1 to skip length check in Calcite
- max = -1
- }
- SqlOperandCountRanges.between(min, max)
+ override def getOperandCountRange: SqlOperandCountRange = {
+ var min = 254
+ var max = -1
+ var isVarargs = false
+ methods.foreach(m => {
+ var len = m.getParameterTypes.length
+ if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) {
+ isVarargs = true
+ len = len - 1
}
+ max = Math.max(len, max)
+ min = Math.min(len, min)
+ })
+ if (isVarargs) {
+ // if eval method is varargs, set max to -1 to skip length check in Calcite
+ max = -1
+ }
+ SqlOperandCountRanges.between(min, max)
+ }
- override def checkOperandTypes(
- callBinding: SqlCallBinding,
- throwOnFailure: Boolean)
- : Boolean = {
- val operandTypes = getOperandType(callBinding)
-
- if (getEvalUserDefinedMethod(udtf, operandTypes).isEmpty) {
- if (throwOnFailure) {
- throw new ValidationException(
- s"Given parameters of function '$name' do not match any signature. \n" +
- s"Actual: ${signatureInternalToString(operandTypes)} \n" +
- s"Expected: ${signaturesToString(udtf, "eval")}")
- } else {
- false
- }
- } else {
- true
- }
+ override def checkOperandTypes(
+ callBinding: SqlCallBinding,
+ throwOnFailure: Boolean)
+ : Boolean = {
+ val operandTypes = getOperandType(callBinding)
+
+ if (getEvalUserDefinedMethod(udtf, operandTypes).isEmpty) {
+ if (throwOnFailure) {
+ throw new ValidationException(
+ s"Given parameters of function '$name' do not match any signature. \n" +
+ s"Actual: ${signatureInternalToString(operandTypes)} \n" +
+ s"Expected: ${signaturesToString(udtf, "eval")}")
+ } else {
+ false
}
-
- override def isOptional(i: Int): Boolean = false
-
- override def getConsistency: Consistency = Consistency.NONE
-
+ } else {
+ true
}
}
+
+ override def isOptional(i: Int): Boolean = false
+
+ override def getConsistency: Consistency = Consistency.NONE
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
index e565a6e..3552a7f 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/functions/utils/UserDefinedFunctionUtils.scala
@@ -802,7 +802,7 @@ object UserDefinedFunctionUtils {
}.toArray
}
- private[table] def buildRelDataType(
+ def buildRelDataType(
typeFactory: RelDataTypeFactory,
resultType: LogicalType,
fieldNames: Array[String],
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
index e2ab608..38f4b03 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalTableFunctionScan.scala
@@ -100,7 +100,7 @@ class FlinkLogicalTableFunctionScanConverter
return false
}
val tableFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
- tableFunction.getTableFunction.isInstanceOf[TemporalTableFunction]
+ tableFunction.udtf.isInstanceOf[TemporalTableFunction]
}
def convert(rel: RelNode): RelNode = {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
index a240b64..62a4872 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
@@ -182,11 +182,11 @@ class GetTemporalTableFunctionCall(
}
val tableFunction = rexCall.getOperator.asInstanceOf[TableSqlFunction]
- if (!tableFunction.getTableFunction.isInstanceOf[TemporalTableFunction]) {
+ if (!tableFunction.udtf.isInstanceOf[TemporalTableFunction]) {
return null
}
val temporalTableFunction =
- tableFunction.getTableFunction.asInstanceOf[TemporalTableFunctionImpl]
+ tableFunction.udtf.asInstanceOf[TemporalTableFunctionImpl]
checkState(
rexCall.getOperands.size().equals(1),
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
index 9bf8221..6f467d4 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/DeferredTypeFlinkTableFunction.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.schema
+import org.apache.flink.table.functions
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
@@ -39,6 +40,7 @@ class DeferredTypeFlinkTableFunction(
extends FlinkTableFunction(tableFunction) {
override def getExternalResultType(
+ tableFunction: functions.TableFunction[_],
arguments: Array[AnyRef],
argTypes: Array[Class[_]]): DataType = {
// TODO
@@ -56,7 +58,7 @@ class DeferredTypeFlinkTableFunction(
typeFactory: RelDataTypeFactory,
arguments: Array[AnyRef],
argTypes: Array[Class[_]]): RelDataType = {
- val resultType = getExternalResultType(arguments, argTypes)
+ val resultType = getExternalResultType(tableFunction, arguments, argTypes)
val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType)
UserDefinedFunctionUtils.buildRelDataType(
typeFactory, fromDataTypeToLogicalType(resultType), fieldNames, fieldIndexes)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala
index 73f5e63..6ecb8e7 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/FlinkTableFunction.scala
@@ -51,6 +51,7 @@ abstract class FlinkTableFunction(
* Returns the Type for usage, i.e. code generation.
*/
def getExternalResultType(
+ tableFunction: functions.TableFunction[_],
arguments: Array[AnyRef],
argTypes: Array[Class[_]]): DataType
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
index 773af90..828a4b6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/TypedFlinkTableFunction.scala
@@ -18,6 +18,7 @@
package org.apache.flink.table.planner.plan.schema
+import org.apache.flink.table.functions
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
@@ -41,6 +42,7 @@ class TypedFlinkTableFunction(
extends FlinkTableFunction(tableFunction) {
override def getExternalResultType(
+ tableFunction: functions.TableFunction[_],
arguments: Array[AnyRef],
argTypes: Array[Class[_]]): DataType =
externalResultType
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
index 432f0a2..bd69126 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/UserDefinedFunctionTestUtils.scala
@@ -334,7 +334,7 @@ object UserDefinedFunctionTestUtils {
}
a + b
}
-
+
def eval(a: Long, b: Int): Long = {
eval(a, b.asInstanceOf[Long])
}