You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/06/04 04:29:27 UTC
[flink] branch master updated: [FLINK-15854][hive] Use the new type inference for Hive UDTF (#18958)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 22f9a6d03f5 [FLINK-15854][hive] Use the new type inference for Hive UDTF (#18958)
22f9a6d03f5 is described below
commit 22f9a6d03f5337ffa4f322a526aecefba1539248
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Sat Jun 4 12:29:13 2022 +0800
[FLINK-15854][hive] Use the new type inference for Hive UDTF (#18958)
---
.../flink/table/catalog/hive/client/HiveShim.java | 6 +
.../table/catalog/hive/client/HiveShimV100.java | 10 +
.../table/catalog/hive/client/HiveShimV310.java | 12 ++
.../factories/HiveFunctionDefinitionFactory.java | 8 +-
.../catalog/hive/util/HiveReflectionUtils.java | 6 +-
.../flink/table/functions/hive/HiveFunction.java | 118 +++++++++---
.../functions/hive/HiveFunctionArguments.java | 86 +++++++++
.../table/functions/hive/HiveGenericUDAF.java | 2 +-
.../flink/table/functions/hive/HiveGenericUDF.java | 15 +-
.../table/functions/hive/HiveGenericUDTF.java | 58 +++---
.../{HiveFunction.java => HiveLegacyFunction.java} | 4 +-
.../table/functions/hive/HiveScalarFunction.java | 96 ++--------
.../flink/table/functions/hive/HiveSimpleUDF.java | 19 +-
.../functions/hive/conversion/HiveInspectors.java | 85 +++++++--
.../functions/hive/util/HiveFunctionUtil.java | 12 +-
.../delegation/hive/HiveParserCalcitePlanner.java | 6 +-
.../planner/delegation/hive/HiveParserUtils.java | 22 ---
.../hive/copy/HiveParserTypeConverter.java | 8 +-
.../hive/TableEnvHiveConnectorITCase.java | 11 +-
.../table/functions/hive/HiveGenericUDFTest.java | 17 +-
.../table/functions/hive/HiveGenericUDTFTest.java | 19 +-
.../table/functions/hive/HiveSimpleUDFTest.java | 69 +------
.../flink/table/module/hive/HiveModuleTest.java | 12 +-
.../catalog/FunctionCatalogOperatorTable.java | 23 +--
.../functions/utils/HiveTableSqlFunction.java | 203 ---------------------
25 files changed, 405 insertions(+), 522 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 4d4b32cc0aa..8e3e3cf2608 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -120,6 +120,9 @@ public interface HiveShim extends Serializable {
*/
Class<?> getDateDataTypeClass();
+ /** Gets writable class for Date type. */
+ Class<?> getDateWritableClass();
+
/**
* Hive Timestamp data type class was changed in Hive 3.1.0.
*
@@ -127,6 +130,9 @@ public interface HiveShim extends Serializable {
*/
Class<?> getTimestampDataTypeClass();
+ /** Gets writable class for Timestamp type. */
+ Class<?> getTimestampWritableClass();
+
/**
* Generate Hive ColumnStatisticsData from Flink CatalogColumnStatisticsDataDate for DATE
* columns.
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
index c440bc53109..955c0e50fa4 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
@@ -196,11 +196,21 @@ public class HiveShimV100 implements HiveShim {
return java.sql.Date.class;
}
+ @Override
+ public Class<?> getDateWritableClass() {
+ return DateWritable.class;
+ }
+
@Override
public Class<?> getTimestampDataTypeClass() {
return java.sql.Timestamp.class;
}
+ @Override
+ public Class<?> getTimestampWritableClass() {
+ return TimestampWritable.class;
+ }
+
@Override
public ColumnStatisticsData toHiveDateColStats(
CatalogColumnStatisticsDataDate flinkDateColStats) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
index a04763cfa2e..0aeccf927bf 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
@@ -139,12 +139,24 @@ public class HiveShimV310 extends HiveShimV239 {
return hiveDateClz;
}
+ @Override
+ public Class<?> getDateWritableClass() {
+ initDateTimeClasses();
+ return dateWritableConstructor.getDeclaringClass();
+ }
+
@Override
public Class<?> getTimestampDataTypeClass() {
initDateTimeClasses();
return hiveTimestampClz;
}
+ @Override
+ public Class<?> getTimestampWritableClass() {
+ initDateTimeClasses();
+ return timestampWritableConstructor.getDeclaringClass();
+ }
+
@Override
public Set<String> getNotNullColumns(
IMetaStoreClient client, Configuration conf, String dbName, String tableName) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
index 964a80663f5..4041735ec4b 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
@@ -26,14 +26,12 @@ import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.factories.FunctionDefinitionFactory;
import org.apache.flink.table.functions.AggregateFunctionDefinition;
import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.functions.TableFunctionDefinition;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
import org.apache.flink.table.functions.hive.HiveGenericUDAF;
import org.apache.flink.table.functions.hive.HiveGenericUDF;
import org.apache.flink.table.functions.hive.HiveGenericUDTF;
import org.apache.flink.table.functions.hive.HiveSimpleUDF;
-import org.apache.flink.types.Row;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDF;
@@ -99,11 +97,7 @@ public class HiveFunctionDefinitionFactory implements FunctionDefinitionFactory
return new HiveGenericUDF(new HiveFunctionWrapper<>(functionClassName), hiveShim);
} else if (GenericUDTF.class.isAssignableFrom(clazz)) {
LOG.info("Transforming Hive function '{}' into a HiveGenericUDTF", name);
-
- HiveGenericUDTF udtf =
- new HiveGenericUDTF(new HiveFunctionWrapper<>(functionClassName), hiveShim);
-
- return new TableFunctionDefinition(name, udtf, GenericTypeInfo.of(Row.class));
+ return new HiveGenericUDTF(new HiveFunctionWrapper<>(functionClassName), hiveShim);
} else if (GenericUDAFResolver2.class.isAssignableFrom(clazz)
|| UDAF.class.isAssignableFrom(clazz)) {
HiveGenericUDAF udaf;
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
index 88b9e717b99..0128cea1c9c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
@@ -67,10 +67,10 @@ public class HiveReflectionUtils {
}
}
- public static ObjectInspector createConstantObjectInspector(String className, Object value) {
+ public static ObjectInspector createConstantObjectInspector(
+ String className, Class<?> valueClz, Object value) {
try {
- Constructor<?> method =
- Class.forName(className).getDeclaredConstructor(value.getClass());
+ Constructor<?> method = Class.forName(className).getDeclaredConstructor(valueClz);
method.setAccessible(true);
return (ObjectInspector) method.newInstance(value);
} catch (ClassNotFoundException
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java
index 8804f5e62d2..c51a3c78370 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java
@@ -19,38 +19,104 @@
package org.apache.flink.table.functions.hive;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategy;
-/**
- * Interface for Hive simple udf, generic udf, and generic udtf. TODO: Note: this is only a
- * temporary interface for workaround when Flink type system and udf system rework is not finished.
- * Should adapt to Flink type system and Flink UDF framework later on.
- */
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Interface for Hive UDF, UDTF, UDAF. */
@Internal
-public interface HiveFunction {
+public interface HiveFunction<UDFType> {
- /**
- * Set arguments and argTypes for Function instance. In this way, the correct method can be
- * really deduced by the function instance.
- *
- * @param constantArguments arguments of a function call (only literal arguments are passed,
- * nulls for non-literal ones)
- * @param argTypes types of arguments
- */
- void setArgumentTypesAndConstants(Object[] constantArguments, DataType[] argTypes);
+ /** Sets input arguments for the function. */
+ void setArguments(CallContext callContext);
/**
- * Get result type by arguments and argTypes.
- *
- * <p>We can't use getResultType(Object[], Class[]). The Class[] is the classes of what is
- * defined in eval(), for example, if eval() is "public Integer eval(Double)", the argTypes
- * would be Class[Double]. However, in our wrapper, the signature of eval() is "public Object
- * eval(Object... args)", which means we cannot get any info from the interface.
+ * Infers the return type of the function. This method should be called after {@link
+ * #setArguments(CallContext)} is called.
*
- * @param constantArguments arguments of a function call (only literal arguments are passed,
- * nulls for non-literal ones)
- * @param argTypes types of arguments
- * @return result type.
+ * @return The inferred return type.
+ * @throws UDFArgumentException can be thrown if the input arguments are invalid.
*/
- DataType getHiveResultType(Object[] constantArguments, DataType[] argTypes);
+ DataType inferReturnType() throws UDFArgumentException;
+
+ /** Gets the wrapper for the Hive function. */
+ HiveFunctionWrapper<UDFType> getFunctionWrapper();
+
+ /** Creates {@link TypeInference} for the function. */
+ default TypeInference createTypeInference() {
+ TypeInference.Builder builder = TypeInference.newBuilder();
+ builder.inputTypeStrategy(new HiveFunctionInputStrategy(this));
+ builder.outputTypeStrategy(new HiveFunctionOutputStrategy(this));
+ return builder.build();
+ }
+
+ /** InputTypeStrategy for Hive UDF, UDTF, UDAF. */
+ class HiveFunctionInputStrategy implements InputTypeStrategy {
+
+ private final HiveFunction<?> hiveFunction;
+
+ public HiveFunctionInputStrategy(HiveFunction<?> hiveFunction) {
+ this.hiveFunction = hiveFunction;
+ }
+
+ @Override
+ public ArgumentCount getArgumentCount() {
+ return ConstantArgumentCount.any();
+ }
+
+ @Override
+ public Optional<List<DataType>> inferInputTypes(
+ CallContext callContext, boolean throwOnFailure) {
+ hiveFunction.setArguments(callContext);
+ try {
+ hiveFunction.inferReturnType();
+ } catch (UDFArgumentException e) {
+ if (throwOnFailure) {
+ throw callContext.newValidationError(
+ "Cannot find a suitable Hive function from %s for the input arguments",
+ hiveFunction.getFunctionWrapper().getClassName());
+ } else {
+ return Optional.empty();
+ }
+ }
+ return Optional.of(callContext.getArgumentDataTypes());
+ }
+
+ @Override
+ public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
+ return Collections.singletonList(Signature.of(Signature.Argument.of("*")));
+ }
+ }
+
+ /** OutputTypeStrategy for Hive UDF, UDTF, UDAF. */
+ class HiveFunctionOutputStrategy implements TypeStrategy {
+
+ private final HiveFunction<?> hiveFunction;
+
+ public HiveFunctionOutputStrategy(HiveFunction<?> hiveFunction) {
+ this.hiveFunction = hiveFunction;
+ }
+
+ @Override
+ public Optional<DataType> inferType(CallContext callContext) {
+ hiveFunction.setArguments(callContext);
+ try {
+ return Optional.of(hiveFunction.inferReturnType());
+ } catch (UDFArgumentException e) {
+ throw new FlinkHiveUDFException(e);
+ }
+ }
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionArguments.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionArguments.java
new file mode 100644
index 00000000000..6484591623f
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionArguments.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+/** Stores arguments information for a Hive function . */
+public class HiveFunctionArguments implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ // input arguments -- store the value if an argument is literal, null otherwise
+ private final Object[] args;
+ // date type of each argument
+ private final DataType[] argTypes;
+ // store the indices of literal arguments, so that we can support null literals
+ private final BitSet literalIndices;
+
+ private HiveFunctionArguments(Object[] args, DataType[] argTypes, BitSet literalIndices) {
+ this.args = args;
+ this.argTypes = argTypes;
+ this.literalIndices = literalIndices;
+ }
+
+ public int size() {
+ return args.length;
+ }
+
+ public boolean isLiteral(int pos) {
+ return pos >= 0 && pos < args.length && literalIndices.get(pos);
+ }
+
+ public Object getArg(int pos) {
+ return args[pos];
+ }
+
+ public DataType getDataType(int pos) {
+ return argTypes[pos];
+ }
+
+ // create from a CallContext
+ public static HiveFunctionArguments create(CallContext callContext) {
+ DataType[] argTypes = callContext.getArgumentDataTypes().toArray(new DataType[0]);
+ Object[] args = new Object[argTypes.length];
+ BitSet literalIndices = new BitSet(args.length);
+ for (int i = 0; i < args.length; i++) {
+ if (callContext.isArgumentLiteral(i)) {
+ literalIndices.set(i);
+ args[i] =
+ callContext
+ .getArgumentValue(
+ i, argTypes[i].getLogicalType().getDefaultConversion())
+ .orElse(null);
+ // we always use string type for string constant arg because that's what hive UDFs
+ // expect.
+ // it may happen that the type is char when call the function
+ // in Flink SQL for calcite treat string literal as char type.
+ if (args[i] instanceof String) {
+ argTypes[i] = DataTypes.STRING();
+ }
+ }
+ }
+ return new HiveFunctionArguments(args, argTypes, literalIndices);
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
index 99eb3b79c2e..0be3b3b1fb6 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -48,7 +48,7 @@ import java.util.Arrays;
@Internal
public class HiveGenericUDAF
extends AggregateFunction<Object, GenericUDAFEvaluator.AggregationBuffer>
- implements HiveFunction {
+ implements HiveLegacyFunction {
private final HiveFunctionWrapper hiveFunctionWrapper;
// Flag that indicates whether a bridge between GenericUDAF and UDAF is required.
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
index a23a2539e3b..cca3a9ce4b8 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
@@ -40,8 +40,9 @@ public class HiveGenericUDF extends HiveScalarFunction<GenericUDF> {
private static final Logger LOG = LoggerFactory.getLogger(HiveGenericUDF.class);
+ private final HiveShim hiveShim;
+
private transient GenericUDF.DeferredObject[] deferredObjects;
- private HiveShim hiveShim;
public HiveGenericUDF(HiveFunctionWrapper<GenericUDF> hiveFunctionWrapper, HiveShim hiveShim) {
super(hiveFunctionWrapper);
@@ -56,8 +57,7 @@ public class HiveGenericUDF extends HiveScalarFunction<GenericUDF> {
function = createFunction();
- ObjectInspector[] argInspectors =
- HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes);
+ ObjectInspector[] argInspectors = HiveInspectors.getArgInspectors(hiveShim, arguments);
try {
returnInspector = function.initializeAndFoldConstants(argInspectors);
@@ -65,12 +65,12 @@ public class HiveGenericUDF extends HiveScalarFunction<GenericUDF> {
throw new FlinkHiveUDFException(e);
}
- deferredObjects = new GenericUDF.DeferredObject[argTypes.length];
+ deferredObjects = new GenericUDF.DeferredObject[arguments.size()];
for (int i = 0; i < deferredObjects.length; i++) {
deferredObjects[i] =
new DeferredObjectAdapter(
- argInspectors[i], argTypes[i].getLogicalType(), hiveShim);
+ argInspectors[i], arguments.getDataType(i).getLogicalType(), hiveShim);
}
}
@@ -93,12 +93,11 @@ public class HiveGenericUDF extends HiveScalarFunction<GenericUDF> {
}
@Override
- protected DataType inferReturnType() throws UDFArgumentException {
+ public DataType inferReturnType() throws UDFArgumentException {
LOG.info(
"Getting result type of HiveGenericUDF from {}",
hiveFunctionWrapper.getClassName());
- ObjectInspector[] argumentInspectors =
- HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes);
+ ObjectInspector[] argumentInspectors = HiveInspectors.getArgInspectors(hiveShim, arguments);
ObjectInspector resultObjectInspector =
createFunction().initializeAndFoldConstants(argumentInspectors);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
index 2251fea6c01..616ff096dec 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.functions.hive;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.functions.FunctionContext;
@@ -29,8 +29,9 @@ import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
import org.apache.flink.table.functions.hive.conversion.IdentityConversion;
import org.apache.flink.table.functions.hive.util.HiveFunctionUtil;
-import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.types.Row;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
@@ -48,13 +49,13 @@ import static org.apache.flink.util.Preconditions.checkArgument;
/** A TableFunction implementation that calls Hive's {@link GenericUDTF}. */
@Internal
-public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction {
+public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction<GenericUDTF> {
private static final Logger LOG = LoggerFactory.getLogger(HiveGenericUDTF.class);
private final HiveFunctionWrapper<GenericUDTF> hiveFunctionWrapper;
+ private final HiveShim hiveShim;
- private Object[] constantArguments;
- private DataType[] argTypes;
+ private HiveFunctionArguments arguments;
private transient GenericUDTF function;
private transient StructObjectInspector returnInspector;
@@ -62,7 +63,6 @@ public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction
private transient boolean allIdentityConverter;
private transient HiveObjectConversion[] conversions;
- private HiveShim hiveShim;
public HiveGenericUDTF(
HiveFunctionWrapper<GenericUDTF> hiveFunctionWrapper, HiveShim hiveShim) {
@@ -80,17 +80,18 @@ public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction
HiveGenericUDTF.this.collect(row);
});
- ObjectInspector[] argumentInspectors =
- HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes);
+ ObjectInspector[] argumentInspectors = HiveInspectors.getArgInspectors(hiveShim, arguments);
returnInspector = function.initialize(argumentInspectors);
- isArgsSingleArray = HiveFunctionUtil.isSingleBoxedArray(argTypes);
+ isArgsSingleArray = HiveFunctionUtil.isSingleBoxedArray(arguments);
conversions = new HiveObjectConversion[argumentInspectors.length];
for (int i = 0; i < argumentInspectors.length; i++) {
conversions[i] =
HiveInspectors.getConversion(
- argumentInspectors[i], argTypes[i].getLogicalType(), hiveShim);
+ argumentInspectors[i],
+ arguments.getDataType(i).getLogicalType(),
+ hiveShim);
}
allIdentityConverter =
@@ -126,35 +127,32 @@ public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction
}
@Override
- public void setArgumentTypesAndConstants(Object[] constantArguments, DataType[] argTypes) {
- this.constantArguments = constantArguments;
- this.argTypes = argTypes;
+ public void close() throws Exception {
+ function.close();
}
@Override
- public DataType getHiveResultType(Object[] constantArguments, DataType[] argTypes) {
- LOG.info(
- "Getting result type of HiveGenericUDTF with {}",
- hiveFunctionWrapper.getClassName());
+ public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+ return createTypeInference();
+ }
- try {
- ObjectInspector[] argumentInspectors =
- HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes);
- return HiveTypeUtil.toFlinkType(
- hiveFunctionWrapper.createFunction().initialize(argumentInspectors));
- } catch (UDFArgumentException e) {
- throw new FlinkHiveUDFException(e);
- }
+ @Override
+ public void setArguments(CallContext callContext) {
+ arguments = HiveFunctionArguments.create(callContext);
}
@Override
- public TypeInformation getResultType() {
- return TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo(
- getHiveResultType(this.constantArguments, this.argTypes).getLogicalType());
+ public DataType inferReturnType() throws UDFArgumentException {
+ LOG.info(
+ "Getting result type of HiveGenericUDTF with {}",
+ hiveFunctionWrapper.getClassName());
+ ObjectInspector[] argumentInspectors = HiveInspectors.getArgInspectors(hiveShim, arguments);
+ return HiveTypeUtil.toFlinkType(
+ hiveFunctionWrapper.createFunction().initialize(argumentInspectors));
}
@Override
- public void close() throws Exception {
- function.close();
+ public HiveFunctionWrapper<GenericUDTF> getFunctionWrapper() {
+ return hiveFunctionWrapper;
}
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveLegacyFunction.java
similarity index 96%
copy from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveLegacyFunction.java
index 8804f5e62d2..77759a65422 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveLegacyFunction.java
@@ -18,7 +18,6 @@
package org.apache.flink.table.functions.hive;
-import org.apache.flink.annotation.Internal;
import org.apache.flink.table.types.DataType;
/**
@@ -26,8 +25,7 @@ import org.apache.flink.table.types.DataType;
* temporary interface for workaround when Flink type system and udf system rework is not finished.
* Should adapt to Flink type system and Flink UDF framework later on.
*/
-@Internal
-public interface HiveFunction {
+public interface HiveLegacyFunction {
/**
* Set arguments and argTypes for Function instance. In this way, the correct method can be
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java
index 084089e6acd..0620910f9da 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java
@@ -19,41 +19,27 @@
package org.apache.flink.table.functions.hive;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.functions.FunctionContext;
-import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.hive.util.HiveFunctionUtil;
-import org.apache.flink.table.runtime.types.ClassLogicalTypeConverter;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.ConstantArgumentCount;
-import org.apache.flink.table.types.inference.InputTypeStrategy;
-import org.apache.flink.table.types.inference.Signature;
import org.apache.flink.table.types.inference.TypeInference;
-import org.apache.flink.table.types.inference.TypeStrategy;
import org.apache.hadoop.hive.ql.exec.UDF;
-import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-
/**
* Abstract class to provide more information for Hive {@link UDF} and {@link GenericUDF} functions.
*/
@Internal
-public abstract class HiveScalarFunction<UDFType> extends ScalarFunction {
+public abstract class HiveScalarFunction<UDFType> extends ScalarFunction
+ implements HiveFunction<UDFType> {
protected final HiveFunctionWrapper<UDFType> hiveFunctionWrapper;
- protected Object[] constantArguments;
- protected DataType[] argTypes;
+ protected HiveFunctionArguments arguments;
protected transient UDFType function;
protected transient ObjectInspector returnInspector;
@@ -82,15 +68,12 @@ public abstract class HiveScalarFunction<UDFType> extends ScalarFunction {
public void open(FunctionContext context) {
openInternal();
- isArgsSingleArray = HiveFunctionUtil.isSingleBoxedArray(argTypes);
+ isArgsSingleArray = HiveFunctionUtil.isSingleBoxedArray(arguments);
}
@Override
public TypeInference getTypeInference(DataTypeFactory typeFactory) {
- TypeInference.Builder builder = TypeInference.newBuilder();
- builder.inputTypeStrategy(new HiveUDFInputStrategy());
- builder.outputTypeStrategy(new HiveUDFOutputStrategy());
- return builder.build();
+ return createTypeInference();
}
/** See {@link ScalarFunction#open(FunctionContext)}. */
@@ -113,70 +96,13 @@ public abstract class HiveScalarFunction<UDFType> extends ScalarFunction {
/** Evaluation logical, args will be wrapped when is a single array. */
protected abstract Object evalInternal(Object[] args);
- private void setArguments(CallContext callContext) {
- DataType[] inputTypes = callContext.getArgumentDataTypes().toArray(new DataType[0]);
- Object[] constantArgs = new Object[inputTypes.length];
- for (int i = 0; i < constantArgs.length; i++) {
- if (callContext.isArgumentLiteral(i)) {
- constantArgs[i] =
- callContext
- .getArgumentValue(
- i,
- ClassLogicalTypeConverter.getDefaultExternalClassForType(
- inputTypes[i].getLogicalType()))
- .orElse(null);
- }
- }
- this.constantArguments = constantArgs;
- this.argTypes = inputTypes;
- }
-
- /** Infer return type of this function call. */
- protected abstract DataType inferReturnType() throws UDFArgumentException;
-
- private class HiveUDFOutputStrategy implements TypeStrategy {
-
- @Override
- public Optional<DataType> inferType(CallContext callContext) {
- setArguments(callContext);
- try {
- return Optional.of(inferReturnType());
- } catch (UDFArgumentException e) {
- throw new FlinkHiveUDFException(e);
- }
- }
+ @Override
+ public void setArguments(CallContext callContext) {
+ arguments = HiveFunctionArguments.create(callContext);
}
- private class HiveUDFInputStrategy implements InputTypeStrategy {
-
- @Override
- public ArgumentCount getArgumentCount() {
- return ConstantArgumentCount.any();
- }
-
- @Override
- public Optional<List<DataType>> inferInputTypes(
- CallContext callContext, boolean throwOnFailure) {
- setArguments(callContext);
- try {
- inferReturnType();
- } catch (UDFArgumentException e) {
- if (throwOnFailure) {
- throw new ValidationException(
- String.format(
- "Cannot find a suitable Hive function from %s for the input arguments",
- hiveFunctionWrapper.getClassName()),
- e);
- } else {
- return Optional.empty();
- }
- }
- return Optional.of(callContext.getArgumentDataTypes());
- }
-
- @Override
- public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
- return Collections.singletonList(Signature.of(Signature.Argument.of("*")));
- }
+ @Override
+ public HiveFunctionWrapper<UDFType> getFunctionWrapper() {
+ return hiveFunctionWrapper;
}
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
index a4427317b97..23adb64f043 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
@@ -51,11 +51,12 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
private static final Logger LOG = LoggerFactory.getLogger(HiveSimpleUDF.class);
+ private final HiveShim hiveShim;
+
private transient Method method;
private transient GenericUDFUtils.ConversionHelper conversionHelper;
private transient HiveObjectConversion[] conversions;
private transient boolean allIdentityConverter;
- private HiveShim hiveShim;
public HiveSimpleUDF(HiveFunctionWrapper<UDF> hiveFunctionWrapper, HiveShim hiveShim) {
super(hiveFunctionWrapper);
@@ -71,8 +72,8 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
List<TypeInfo> typeInfos = new ArrayList<>();
- for (DataType arg : argTypes) {
- typeInfos.add(HiveTypeUtil.toHiveTypeInfo(arg, false));
+ for (int i = 0; i < arguments.size(); i++) {
+ typeInfos.add(HiveTypeUtil.toHiveTypeInfo(arguments.getDataType(i), false));
}
try {
@@ -83,7 +84,7 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
ObjectInspector[] argInspectors = new ObjectInspector[typeInfos.size()];
- for (int i = 0; i < argTypes.length; i++) {
+ for (int i = 0; i < arguments.size(); i++) {
argInspectors[i] =
TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfos.get(i));
}
@@ -93,7 +94,9 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
for (int i = 0; i < argInspectors.length; i++) {
conversions[i] =
HiveInspectors.getConversion(
- argInspectors[i], argTypes[i].getLogicalType(), hiveShim);
+ argInspectors[i],
+ arguments.getDataType(i).getLogicalType(),
+ hiveShim);
}
allIdentityConverter =
@@ -128,10 +131,10 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
}
@Override
- protected DataType inferReturnType() throws UDFArgumentException {
+ public DataType inferReturnType() throws UDFArgumentException {
List<TypeInfo> argTypeInfo = new ArrayList<>();
- for (DataType argType : argTypes) {
- argTypeInfo.add(HiveTypeUtil.toHiveTypeInfo(argType, false));
+ for (int i = 0; i < arguments.size(); i++) {
+ argTypeInfo.add(HiveTypeUtil.toHiveTypeInfo(arguments.getDataType(i), false));
}
Method evalMethod =
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 8169175dccd..0dfd2908a5b 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.functions.hive.HiveFunctionArguments;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.CharType;
@@ -36,6 +37,9 @@ import org.apache.flink.types.Row;
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -83,8 +87,16 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
-import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.lang.reflect.Array;
import java.lang.reflect.Constructor;
@@ -104,6 +116,34 @@ import java.util.Map;
@Internal
public class HiveInspectors {
+ /** Get object inspector for each function argument. */
+ public static ObjectInspector[] getArgInspectors(
+ HiveShim hiveShim, HiveFunctionArguments arguments) {
+ ObjectInspector[] inspectors = new ObjectInspector[arguments.size()];
+ for (int i = 0; i < inspectors.length; i++) {
+ if (arguments.isLiteral(i)) {
+ Object constant = arguments.getArg(i);
+ PrimitiveTypeInfo primitiveTypeInfo =
+ (PrimitiveTypeInfo)
+ HiveTypeUtil.toHiveTypeInfo(arguments.getDataType(i), false);
+ constant =
+ getConversion(
+ getObjectInspector(primitiveTypeInfo),
+ arguments.getDataType(i).getLogicalType(),
+ hiveShim)
+ .toHiveObject(constant);
+ inspectors[i] =
+ getObjectInspectorForPrimitiveConstant(
+ primitiveTypeInfo, constant, hiveShim);
+ } else {
+ inspectors[i] =
+ TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
+ HiveTypeUtil.toHiveTypeInfo(arguments.getDataType(i), false));
+ }
+ }
+ return inspectors;
+ }
+
/** Get an array of ObjectInspector from the give array of args and their types. */
public static ObjectInspector[] toInspectors(
HiveShim hiveShim, Object[] args, DataType[] argTypes) {
@@ -391,39 +431,47 @@ public class HiveInspectors {
}
private static ObjectInspector getObjectInspectorForPrimitiveConstant(
- PrimitiveTypeInfo primitiveTypeInfo, @Nonnull Object value, HiveShim hiveShim) {
+ PrimitiveTypeInfo primitiveTypeInfo, @Nullable Object value, HiveShim hiveShim) {
String className;
value = hiveShim.hivePrimitiveToWritable(value);
switch (primitiveTypeInfo.getPrimitiveCategory()) {
case BOOLEAN:
className = WritableConstantBooleanObjectInspector.class.getName();
- return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ return HiveReflectionUtils.createConstantObjectInspector(
+ className, BooleanWritable.class, value);
case BYTE:
className = WritableConstantByteObjectInspector.class.getName();
- return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ return HiveReflectionUtils.createConstantObjectInspector(
+ className, ByteWritable.class, value);
case SHORT:
className = WritableConstantShortObjectInspector.class.getName();
- return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ return HiveReflectionUtils.createConstantObjectInspector(
+ className, ShortWritable.class, value);
case INT:
className = WritableConstantIntObjectInspector.class.getName();
- return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ return HiveReflectionUtils.createConstantObjectInspector(
+ className, IntWritable.class, value);
case LONG:
className = WritableConstantLongObjectInspector.class.getName();
- return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ return HiveReflectionUtils.createConstantObjectInspector(
+ className, LongWritable.class, value);
case FLOAT:
className = WritableConstantFloatObjectInspector.class.getName();
- return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ return HiveReflectionUtils.createConstantObjectInspector(
+ className, FloatWritable.class, value);
case DOUBLE:
className = WritableConstantDoubleObjectInspector.class.getName();
- return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ return HiveReflectionUtils.createConstantObjectInspector(
+ className, DoubleWritable.class, value);
case STRING:
className = WritableConstantStringObjectInspector.class.getName();
- return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ return HiveReflectionUtils.createConstantObjectInspector(
+ className, Text.class, value);
case CHAR:
try {
Constructor<WritableConstantHiveCharObjectInspector> constructor =
WritableConstantHiveCharObjectInspector.class.getDeclaredConstructor(
- CharTypeInfo.class, value.getClass());
+ CharTypeInfo.class, HiveCharWritable.class);
constructor.setAccessible(true);
return constructor.newInstance(primitiveTypeInfo, value);
} catch (Exception e) {
@@ -434,7 +482,7 @@ public class HiveInspectors {
try {
Constructor<WritableConstantHiveVarcharObjectInspector> constructor =
WritableConstantHiveVarcharObjectInspector.class.getDeclaredConstructor(
- VarcharTypeInfo.class, value.getClass());
+ VarcharTypeInfo.class, HiveVarcharWritable.class);
constructor.setAccessible(true);
return constructor.newInstance(primitiveTypeInfo, value);
} catch (Exception e) {
@@ -443,15 +491,17 @@ public class HiveInspectors {
}
case DATE:
className = WritableConstantDateObjectInspector.class.getName();
- return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ return HiveReflectionUtils.createConstantObjectInspector(
+ className, hiveShim.getDateWritableClass(), value);
case TIMESTAMP:
className = WritableConstantTimestampObjectInspector.class.getName();
- return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ return HiveReflectionUtils.createConstantObjectInspector(
+ className, hiveShim.getTimestampWritableClass(), value);
case DECIMAL:
try {
Constructor<WritableConstantHiveDecimalObjectInspector> constructor =
WritableConstantHiveDecimalObjectInspector.class.getDeclaredConstructor(
- DecimalTypeInfo.class, value.getClass());
+ DecimalTypeInfo.class, HiveDecimalWritable.class);
constructor.setAccessible(true);
return constructor.newInstance(primitiveTypeInfo, value);
} catch (Exception e) {
@@ -460,13 +510,14 @@ public class HiveInspectors {
}
case BINARY:
className = WritableConstantBinaryObjectInspector.class.getName();
- return HiveReflectionUtils.createConstantObjectInspector(className, value);
+ return HiveReflectionUtils.createConstantObjectInspector(
+ className, ByteWritable.class, value);
case UNKNOWN:
case VOID:
// If type is null, we use the Constant String to replace
className = WritableConstantStringObjectInspector.class.getName();
return HiveReflectionUtils.createConstantObjectInspector(
- className, value.toString());
+ className, Text.class, value == null ? null : value.toString());
default:
throw new FlinkHiveUDFException(
String.format(
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/util/HiveFunctionUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/util/HiveFunctionUtil.java
index 824e8250769..681334a78cd 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/util/HiveFunctionUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/util/HiveFunctionUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.functions.hive.util;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.functions.hive.HiveFunctionArguments;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -28,15 +29,14 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
/** Util for Hive functions. */
@Internal
public class HiveFunctionUtil {
- public static boolean isSingleBoxedArray(DataType[] argTypes) {
- for (DataType dataType : argTypes) {
- if (HiveFunctionUtil.isPrimitiveArray(dataType)) {
+ public static boolean isSingleBoxedArray(HiveFunctionArguments arguments) {
+ for (int i = 0; i < arguments.size(); i++) {
+ if (HiveFunctionUtil.isPrimitiveArray(arguments.getDataType(i))) {
throw new FlinkHiveUDFException(
- "Flink doesn't support primitive array for Hive functions yet.");
+ "Flink doesn't support primitive array for Hive function yet.");
}
}
-
- return argTypes.length == 1 && HiveFunctionUtil.isArrayType(argTypes[0]);
+ return arguments.size() == 1 && HiveFunctionUtil.isArrayType(arguments.getDataType(0));
}
private static boolean isPrimitiveArray(DataType dataType) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
index 4411b088be3..60f5cda3283 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
@@ -50,6 +50,7 @@ import org.apache.flink.table.planner.delegation.hive.copy.HiveParserWindowingSp
import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser;
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserCreateViewInfo;
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
import org.apache.flink.table.planner.plan.nodes.hive.LogicalDistribution;
import org.apache.flink.table.types.DataType;
@@ -98,7 +99,6 @@ import org.apache.calcite.sql.SqlAggFunction;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction;
import org.apache.calcite.sql2rel.DeduplicateCorrelateVariables;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.util.CompositeList;
@@ -2539,9 +2539,9 @@ public class HiveParserCalcitePlanner {
SqlOperator convertedOperator = convertedCall.getOperator();
Preconditions.checkState(
- convertedOperator instanceof SqlUserDefinedTableFunction,
+ convertedOperator instanceof BridgingSqlFunction,
"Expect operator to be "
- + SqlUserDefinedTableFunction.class.getSimpleName()
+ + BridgingSqlFunction.class.getSimpleName()
+ ", actually got "
+ convertedOperator.getClass().getSimpleName());
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java
index 073afb2d956..ae682206abe 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
import org.apache.flink.table.functions.FunctionKind;
import org.apache.flink.table.functions.hive.HiveGenericUDAF;
-import org.apache.flink.table.functions.hive.HiveGenericUDTF;
import org.apache.flink.table.module.hive.udf.generic.GenericUDFLegacyGroupingID;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseDriver;
@@ -48,7 +47,6 @@ import org.apache.flink.table.planner.delegation.hive.parse.HiveParserCreateView
import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.functions.utils.HiveAggSqlFunction;
-import org.apache.flink.table.planner.functions.utils.HiveTableSqlFunction;
import org.apache.flink.table.runtime.types.ClassLogicalTypeConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
@@ -891,18 +889,6 @@ public class HiveParserUtils {
|| sqlOperator instanceof HiveAggSqlFunction) {
SqlReturnTypeInference returnTypeInference = sqlOperator.getReturnTypeInference();
return returnTypeInference.inferReturnType(operatorBinding);
- } else if (sqlOperator instanceof HiveTableSqlFunction) {
- HiveGenericUDTF hiveGenericUDTF =
- (HiveGenericUDTF)
- ((HiveTableSqlFunction) sqlOperator)
- .makeFunction(new Object[0], new LogicalType[0]);
- DataType dataType =
- hiveGenericUDTF.getHiveResultType(
- operatorBinding.getConstantOperands(),
- types.stream()
- .map(HiveParserUtils::toDataType)
- .toArray(DataType[]::new));
- return toRelDataType(dataType, dataTypeFactory);
} else {
throw new FlinkHiveException(
"Unsupported SqlOperator class " + sqlOperator.getClass().getName());
@@ -918,14 +904,6 @@ public class HiveParserUtils {
dataTypeFactory);
}
- public static RelDataType toRelDataType(DataType dataType, RelDataTypeFactory dtFactory) {
- try {
- return toRelDataType(HiveTypeUtil.toHiveTypeInfo(dataType, false), dtFactory);
- } catch (SemanticException e) {
- throw new FlinkHiveException(e);
- }
- }
-
public static DataType toDataType(RelDataType relDataType) {
return HiveTypeUtil.toFlinkType(HiveParserTypeConverter.convert(relDataType));
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserTypeConverter.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserTypeConverter.java
index 56ca2b52ffd..caa8df47a21 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserTypeConverter.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserTypeConverter.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
@@ -68,7 +69,12 @@ public class HiveParserTypeConverter {
for (ColumnInfo ci : rs.getSignature()) {
if (neededCols == null || neededCols.contains(ci.getInternalName())) {
- fieldTypes.add(convert(ci.getType(), dtFactory));
+ RelDataType relDataType = convert(ci.getType(), dtFactory);
+ // if the type is struct, we set it nullable
+ if (ci.getType().getCategory() == ObjectInspector.Category.STRUCT) {
+ relDataType = dtFactory.createTypeWithNullability(relDataType, true);
+ }
+ fieldTypes.add(relDataType);
fieldNames.add(ci.getInternalName());
}
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
index 8ea26b0d0e9..0df52e6b191 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
@@ -261,6 +261,8 @@ public class TableEnvHiveConnectorITCase {
tableEnv.executeSql("create table db1.nested (a array<map<int, string>>)");
tableEnv.executeSql(
"create function hiveudtf as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'");
+ tableEnv.executeSql(
+ "create function json_tuple as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFJSONTuple'");
HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "simple")
.addRow(new Object[] {3, Arrays.asList(1, 2, 3)})
.commit();
@@ -287,7 +289,14 @@ public class TableEnvHiveConnectorITCase {
.execute()
.collect());
assertThat(results.toString()).isEqualTo("[+I[{1=a, 2=b}], +I[{3=c}]]");
-
+ results =
+ CollectionUtil.iteratorToList(
+ tableEnv.sqlQuery(
+ "select foo.i, b.role_id from db1.simple foo,"
+ + " lateral table(json_tuple('{\"a\": \"0\", \"b\": \"1\"}', 'a')) as b(role_id)")
+ .execute()
+ .collect());
+ assertThat(results.toString()).isEqualTo("[+I[3, 0]]");
tableEnv.executeSql("create table db1.ts (a array<timestamp>)");
HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "ts")
.addRow(
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
index cabc275dee5..1b1dd03c884 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
@@ -21,11 +21,10 @@ package org.apache.flink.table.functions.hive;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.functions.hive.HiveSimpleUDFTest.HiveUDFCallContext;
import org.apache.flink.table.functions.hive.util.TestGenericUDFArray;
import org.apache.flink.table.functions.hive.util.TestGenericUDFStructSize;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.utils.CallContextMock;
import org.apache.flink.types.Row;
import org.apache.hadoop.hive.ql.udf.UDFUnhex;
@@ -45,7 +44,11 @@ import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
import static org.apache.flink.table.HiveVersionTestUtil.HIVE_230_OR_LATER;
import static org.apache.flink.table.HiveVersionTestUtil.HIVE_310_OR_LATER;
@@ -53,7 +56,8 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link HiveGenericUDF}. */
public class HiveGenericUDFTest {
- private static HiveShim hiveShim = HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion());
+ private static final HiveShim hiveShim =
+ HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion());
@Test
public void testAbs() {
@@ -287,7 +291,12 @@ public class HiveGenericUDFTest {
HiveGenericUDF udf =
new HiveGenericUDF(new HiveFunctionWrapper(hiveUdfClass.getName()), hiveShim);
- CallContext callContext = new HiveUDFCallContext(constantArgs, argTypes);
+ CallContextMock callContext = new CallContextMock();
+ callContext.argumentDataTypes = Arrays.asList(argTypes);
+ callContext.argumentValues =
+ Arrays.stream(constantArgs).map(Optional::ofNullable).collect(Collectors.toList());
+ callContext.argumentLiterals =
+ Arrays.stream(constantArgs).map(Objects::nonNull).collect(Collectors.toList());
udf.getTypeInference(null).getOutputTypeStrategy().inferType(callContext);
udf.open(null);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java
index 2819fadff76..b82fd4b5f9e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.utils.CallContextMock;
import org.apache.flink.types.Row;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
@@ -43,6 +44,9 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -147,13 +151,20 @@ public class HiveGenericUDTFTest {
Class hiveUdfClass, Object[] constantArgs, DataType[] argTypes) throws Exception {
HiveFunctionWrapper<GenericUDTF> wrapper = new HiveFunctionWrapper(hiveUdfClass.getName());
- HiveGenericUDTF udf = new HiveGenericUDTF(wrapper, hiveShim);
+ CallContextMock callContext = new CallContextMock();
+ callContext.argumentDataTypes = Arrays.asList(argTypes);
+ callContext.argumentValues =
+ Arrays.stream(constantArgs).map(Optional::ofNullable).collect(Collectors.toList());
+ callContext.argumentLiterals =
+ Arrays.stream(constantArgs).map(Objects::nonNull).collect(Collectors.toList());
- udf.setArgumentTypesAndConstants(constantArgs, argTypes);
- udf.getHiveResultType(constantArgs, argTypes);
+ HiveGenericUDTF udf = new HiveGenericUDTF(wrapper, hiveShim);
+ udf.setArguments(callContext);
+ udf.inferReturnType();
ObjectInspector[] argumentInspectors =
- HiveInspectors.toInspectors(hiveShim, constantArgs, argTypes);
+ HiveInspectors.getArgInspectors(
+ hiveShim, HiveFunctionArguments.create(callContext));
ObjectInspector returnInspector = wrapper.createFunction().initialize(argumentInspectors);
udf.open(null);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
index a0d74203673..5a4e864b5b9 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
@@ -19,13 +19,11 @@
package org.apache.flink.table.functions.hive;
import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.hive.client.HiveShim;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.hive.util.TestHiveUDFArray;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.utils.CallContextMock;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.hive.ql.udf.UDFBase64;
@@ -44,8 +42,7 @@ import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
+import java.util.Collections;
import static org.assertj.core.api.Assertions.assertThat;
@@ -222,7 +219,10 @@ public class HiveSimpleUDFTest {
new HiveSimpleUDF(new HiveFunctionWrapper(hiveUdfClass.getName()), hiveShim);
// Hive UDF won't have literal args
- CallContext callContext = new HiveUDFCallContext(new Object[0], argTypes);
+ CallContextMock callContext = new CallContextMock();
+ callContext.argumentDataTypes = Arrays.asList(argTypes);
+ callContext.argumentLiterals = Arrays.asList(new Boolean[argTypes.length]);
+ Collections.fill(callContext.argumentLiterals, false);
udf.getTypeInference(null).getOutputTypeStrategy().inferType(callContext);
udf.open(null);
@@ -257,61 +257,4 @@ public class HiveSimpleUDFTest {
return content;
}
}
-
- /** A CallContext implementation for Hive UDF tests. */
- public static class HiveUDFCallContext implements CallContext {
-
- private final Object[] constantArgs;
- private final DataType[] argTypes;
-
- public HiveUDFCallContext(Object[] constantArgs, DataType[] argTypes) {
- this.constantArgs = constantArgs;
- this.argTypes = argTypes;
- }
-
- @Override
- public DataTypeFactory getDataTypeFactory() {
- return null;
- }
-
- @Override
- public FunctionDefinition getFunctionDefinition() {
- return null;
- }
-
- @Override
- public boolean isArgumentLiteral(int pos) {
- return pos >= 0 && pos < constantArgs.length && constantArgs[pos] != null;
- }
-
- @Override
- public boolean isArgumentNull(int pos) {
- return false;
- }
-
- @Override
- public <T> Optional<T> getArgumentValue(int pos, Class<T> clazz) {
- return (Optional<T>) Optional.of(constantArgs[pos]);
- }
-
- @Override
- public String getName() {
- return null;
- }
-
- @Override
- public List<DataType> getArgumentDataTypes() {
- return Arrays.asList(argTypes);
- }
-
- @Override
- public Optional<DataType> getOutputDataType() {
- return Optional.empty();
- }
-
- @Override
- public boolean isGroupedAggregation() {
- return false;
- }
- }
}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
index 6567c22702d..f7933d9dc5d 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
@@ -23,10 +23,9 @@ import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.hive.HiveSimpleUDF;
-import org.apache.flink.table.functions.hive.HiveSimpleUDFTest.HiveUDFCallContext;
import org.apache.flink.table.module.CoreModule;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.utils.CallContextMock;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
@@ -34,6 +33,8 @@ import org.apache.flink.util.CollectionUtil;
import org.junit.Rule;
import org.junit.Test;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static org.apache.flink.table.catalog.hive.client.HiveShimLoader.HIVE_VERSION_V2_3_9;
@@ -90,7 +91,10 @@ public class HiveModuleTest {
DataType[] inputType = new DataType[] {DataTypes.STRING()};
- CallContext callContext = new HiveUDFCallContext(new Object[0], inputType);
+ CallContextMock callContext = new CallContextMock();
+ callContext.argumentDataTypes = Arrays.asList(inputType);
+ callContext.argumentLiterals = Arrays.asList(new Boolean[inputType.length]);
+ Collections.fill(callContext.argumentLiterals, false);
udf.getTypeInference(null).getOutputTypeStrategy().inferType(callContext);
udf.open(null);
@@ -137,8 +141,6 @@ public class HiveModuleTest {
.collect());
assertThat(results.toString()).isEqualTo("[2018-01-192019-12-27 17:58:23.385]");
- // TODO: null cannot be a constant argument at the moment. This test will make more sense
- // when that changes.
results =
CollectionUtil.iteratorToList(
tEnv.sqlQuery("select concat('ab',cast(null as int))").execute().collect());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
index e0e638ba99e..6e8ed1a874b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
@@ -19,7 +19,6 @@
package org.apache.flink.table.planner.catalog;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ContextResolvedFunction;
import org.apache.flink.table.catalog.DataTypeFactory;
@@ -37,14 +36,10 @@ import org.apache.flink.table.planner.calcite.RexFactory;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
import org.apache.flink.table.planner.functions.utils.HiveAggSqlFunction;
-import org.apache.flink.table.planner.functions.utils.HiveTableSqlFunction;
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
-import org.apache.flink.table.planner.plan.schema.DeferredTypeFlinkTableFunction;
-import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategies;
import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlFunctionCategory;
@@ -61,7 +56,6 @@ import java.util.List;
import java.util.Optional;
import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.isHiveFunc;
-import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
/** Thin adapter between {@link SqlOperatorTable} and {@link FunctionCatalog}. */
@Internal
@@ -123,22 +117,7 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
} else if (definition instanceof TableFunctionDefinition
&& category != null
&& category.isTableFunction()) {
- TableFunctionDefinition def = (TableFunctionDefinition) definition;
- if (isHiveFunc(def.getTableFunction())) {
- DataType returnType = fromLegacyInfoToDataType(new GenericTypeInfo<>(Row.class));
- return Optional.of(
- new HiveTableSqlFunction(
- identifier,
- def.getTableFunction(),
- returnType,
- typeFactory,
- new DeferredTypeFlinkTableFunction(
- def.getTableFunction(), returnType),
- HiveTableSqlFunction.operandTypeChecker(
- identifier.toString(), def.getTableFunction())));
- } else {
- return convertTableFunction(identifier, (TableFunctionDefinition) definition);
- }
+ return convertTableFunction(identifier, (TableFunctionDefinition) definition);
}
// new stack
return convertToBridgingSqlFunction(category, resolvedFunction);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java
deleted file mode 100644
index 984c3aa7123..00000000000
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.functions.utils;
-
-import org.apache.flink.table.functions.FunctionIdentifier;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.plan.schema.FlinkTableFunction;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.utils.DateTimeUtils;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.util.BitString;
-import org.apache.calcite.util.DateString;
-import org.apache.calcite.util.NlsString;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.TimeString;
-import org.apache.calcite.util.TimestampString;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalTime;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import scala.Tuple3;
-
-import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType;
-import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeSetArgs;
-import static org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.buildRelDataType;
-import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
-import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
-
-/**
- * Hive {@link TableSqlFunction}. Override getFunction to clone function and invoke {@code
- * HiveGenericUDTF#setArgumentTypesAndConstants}. Override SqlReturnTypeInference to invoke {@code
- * HiveGenericUDTF#getHiveResultType} instead of {@code HiveGenericUDTF#getResultType(Object[],
- * Class[])}.
- *
- * @deprecated TODO hack code, its logical should be integrated to TableSqlFunction
- */
-@Deprecated
-public class HiveTableSqlFunction extends TableSqlFunction {
-
- private final TableFunction hiveUdtf;
- private final HiveOperandTypeChecker operandTypeChecker;
-
- public HiveTableSqlFunction(
- FunctionIdentifier identifier,
- TableFunction hiveUdtf,
- DataType implicitResultType,
- FlinkTypeFactory typeFactory,
- FlinkTableFunction functionImpl,
- HiveOperandTypeChecker operandTypeChecker) {
- super(
- identifier,
- identifier.toString(),
- hiveUdtf,
- implicitResultType,
- typeFactory,
- functionImpl,
- scala.Option.apply(operandTypeChecker));
- this.hiveUdtf = hiveUdtf;
- this.operandTypeChecker = operandTypeChecker;
- }
-
- @Override
- public TableFunction makeFunction(Object[] constantArguments, LogicalType[] argTypes) {
- TableFunction clone;
- try {
- clone = InstantiationUtil.clone(hiveUdtf);
- } catch (IOException | ClassNotFoundException e) {
- throw new RuntimeException(e);
- }
- return (TableFunction) invokeSetArgs(clone, constantArguments, argTypes);
- }
-
- @Override
- public RelDataType getRowType(RelDataTypeFactory typeFactory, List<Object> arguments) {
- Preconditions.checkNotNull(operandTypeChecker.previousArgTypes);
- FlinkTypeFactory factory = (FlinkTypeFactory) typeFactory;
- Object[] argumentsArray =
- convertArguments(
- Arrays.stream(operandTypeChecker.previousArgTypes)
- .map(factory::createFieldTypeFromLogicalType)
- .collect(Collectors.toList()),
- arguments);
- DataType resultType =
- fromLogicalTypeToDataType(
- FlinkTypeFactory.toLogicalType(
- invokeGetResultType(
- hiveUdtf,
- argumentsArray,
- operandTypeChecker.previousArgTypes,
- (FlinkTypeFactory) typeFactory)));
- Tuple3<String[], int[], LogicalType[]> fieldInfo =
- UserDefinedFunctionUtils.getFieldInfo(resultType);
- return buildRelDataType(
- typeFactory, fromDataTypeToLogicalType(resultType), fieldInfo._1(), fieldInfo._2());
- }
-
- /**
- * This method is copied from calcite, and modify it to not rely on Function. TODO
- * FlinkTableFunction need implement getElementType.
- */
- private static Object[] convertArguments(
- List<RelDataType> operandTypes, List<Object> arguments0) {
- List<Object> arguments = new ArrayList<>(arguments0.size());
- // Construct a list of arguments, if they are all constants.
- for (Pair<RelDataType, Object> pair : Pair.zip(operandTypes, arguments0)) {
- arguments.add(coerce(pair.right, pair.left));
- }
- return arguments.toArray();
- }
-
- private static Object coerce(Object value, RelDataType type) {
- if (value == null) {
- return null;
- }
- switch (type.getSqlTypeName()) {
- case CHAR:
- return ((NlsString) value).getValue();
- case BINARY:
- return ((BitString) value).getAsByteArray();
- case DECIMAL:
- return value;
- case BIGINT:
- return ((BigDecimal) value).longValue();
- case INTEGER:
- return ((BigDecimal) value).intValue();
- case SMALLINT:
- return ((BigDecimal) value).shortValue();
- case TINYINT:
- return ((BigDecimal) value).byteValue();
- case DOUBLE:
- return ((BigDecimal) value).doubleValue();
- case REAL:
- return ((BigDecimal) value).floatValue();
- case FLOAT:
- return ((BigDecimal) value).floatValue();
- case DATE:
- return LocalDate.ofEpochDay(((DateString) value).getDaysSinceEpoch());
- case TIME:
- return LocalTime.ofNanoOfDay(((TimeString) value).getMillisOfDay() * 1000_000);
- case TIMESTAMP:
- return DateTimeUtils.toLocalDateTime(
- ((TimestampString) value).getMillisSinceEpoch());
- default:
- throw new RuntimeException("Not support type: " + type);
- }
- }
-
- public static HiveOperandTypeChecker operandTypeChecker(String name, TableFunction udtf) {
- return new HiveOperandTypeChecker(
- name, udtf, UserDefinedFunctionUtils.checkAndExtractMethods(udtf, "eval"));
- }
-
- /**
- * Checker for remember previousArgTypes.
- *
- * @deprecated TODO hack code, should modify calcite getRowType to pass operand types
- */
- @Deprecated
- public static class HiveOperandTypeChecker extends OperandMetadata {
-
- private LogicalType[] previousArgTypes;
-
- private HiveOperandTypeChecker(String name, TableFunction udtf, Method[] methods) {
- super(name, udtf, methods);
- }
-
- @Override
- public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
- previousArgTypes = UserDefinedFunctionUtils.getOperandTypeArray(callBinding);
- return super.checkOperandTypes(callBinding, throwOnFailure);
- }
- }
-}