You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/06/04 04:29:27 UTC

[flink] branch master updated: [FLINK-15854][hive] Use the new type inference for Hive UDTF (#18958)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 22f9a6d03f5 [FLINK-15854][hive] Use the new type inference for Hive UDTF (#18958)
22f9a6d03f5 is described below

commit 22f9a6d03f5337ffa4f322a526aecefba1539248
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Sat Jun 4 12:29:13 2022 +0800

    [FLINK-15854][hive] Use the new type inference for Hive UDTF (#18958)
---
 .../flink/table/catalog/hive/client/HiveShim.java  |   6 +
 .../table/catalog/hive/client/HiveShimV100.java    |  10 +
 .../table/catalog/hive/client/HiveShimV310.java    |  12 ++
 .../factories/HiveFunctionDefinitionFactory.java   |   8 +-
 .../catalog/hive/util/HiveReflectionUtils.java     |   6 +-
 .../flink/table/functions/hive/HiveFunction.java   | 118 +++++++++---
 .../functions/hive/HiveFunctionArguments.java      |  86 +++++++++
 .../table/functions/hive/HiveGenericUDAF.java      |   2 +-
 .../flink/table/functions/hive/HiveGenericUDF.java |  15 +-
 .../table/functions/hive/HiveGenericUDTF.java      |  58 +++---
 .../{HiveFunction.java => HiveLegacyFunction.java} |   4 +-
 .../table/functions/hive/HiveScalarFunction.java   |  96 ++--------
 .../flink/table/functions/hive/HiveSimpleUDF.java  |  19 +-
 .../functions/hive/conversion/HiveInspectors.java  |  85 +++++++--
 .../functions/hive/util/HiveFunctionUtil.java      |  12 +-
 .../delegation/hive/HiveParserCalcitePlanner.java  |   6 +-
 .../planner/delegation/hive/HiveParserUtils.java   |  22 ---
 .../hive/copy/HiveParserTypeConverter.java         |   8 +-
 .../hive/TableEnvHiveConnectorITCase.java          |  11 +-
 .../table/functions/hive/HiveGenericUDFTest.java   |  17 +-
 .../table/functions/hive/HiveGenericUDTFTest.java  |  19 +-
 .../table/functions/hive/HiveSimpleUDFTest.java    |  69 +------
 .../flink/table/module/hive/HiveModuleTest.java    |  12 +-
 .../catalog/FunctionCatalogOperatorTable.java      |  23 +--
 .../functions/utils/HiveTableSqlFunction.java      | 203 ---------------------
 25 files changed, 405 insertions(+), 522 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
index 4d4b32cc0aa..8e3e3cf2608 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShim.java
@@ -120,6 +120,9 @@ public interface HiveShim extends Serializable {
      */
     Class<?> getDateDataTypeClass();
 
+    /** Gets writable class for Date type. */
+    Class<?> getDateWritableClass();
+
     /**
      * Hive Timestamp data type class was changed in Hive 3.1.0.
      *
@@ -127,6 +130,9 @@ public interface HiveShim extends Serializable {
      */
     Class<?> getTimestampDataTypeClass();
 
+    /** Gets writable class for Timestamp type. */
+    Class<?> getTimestampWritableClass();
+
     /**
      * Generate Hive ColumnStatisticsData from Flink CatalogColumnStatisticsDataDate for DATE
      * columns.
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
index c440bc53109..955c0e50fa4 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV100.java
@@ -196,11 +196,21 @@ public class HiveShimV100 implements HiveShim {
         return java.sql.Date.class;
     }
 
+    @Override
+    public Class<?> getDateWritableClass() {
+        return DateWritable.class;
+    }
+
     @Override
     public Class<?> getTimestampDataTypeClass() {
         return java.sql.Timestamp.class;
     }
 
+    @Override
+    public Class<?> getTimestampWritableClass() {
+        return TimestampWritable.class;
+    }
+
     @Override
     public ColumnStatisticsData toHiveDateColStats(
             CatalogColumnStatisticsDataDate flinkDateColStats) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
index a04763cfa2e..0aeccf927bf 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/client/HiveShimV310.java
@@ -139,12 +139,24 @@ public class HiveShimV310 extends HiveShimV239 {
         return hiveDateClz;
     }
 
+    @Override
+    public Class<?> getDateWritableClass() {
+        initDateTimeClasses();
+        return dateWritableConstructor.getDeclaringClass();
+    }
+
     @Override
     public Class<?> getTimestampDataTypeClass() {
         initDateTimeClasses();
         return hiveTimestampClz;
     }
 
+    @Override
+    public Class<?> getTimestampWritableClass() {
+        initDateTimeClasses();
+        return timestampWritableConstructor.getDeclaringClass();
+    }
+
     @Override
     public Set<String> getNotNullColumns(
             IMetaStoreClient client, Configuration conf, String dbName, String tableName) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
index 964a80663f5..4041735ec4b 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
@@ -26,14 +26,12 @@ import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.functions.AggregateFunctionDefinition;
 import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
 import org.apache.flink.table.functions.hive.HiveGenericUDAF;
 import org.apache.flink.table.functions.hive.HiveGenericUDF;
 import org.apache.flink.table.functions.hive.HiveGenericUDTF;
 import org.apache.flink.table.functions.hive.HiveSimpleUDF;
-import org.apache.flink.types.Row;
 
 import org.apache.hadoop.hive.ql.exec.UDAF;
 import org.apache.hadoop.hive.ql.exec.UDF;
@@ -99,11 +97,7 @@ public class HiveFunctionDefinitionFactory implements FunctionDefinitionFactory
             return new HiveGenericUDF(new HiveFunctionWrapper<>(functionClassName), hiveShim);
         } else if (GenericUDTF.class.isAssignableFrom(clazz)) {
             LOG.info("Transforming Hive function '{}' into a HiveGenericUDTF", name);
-
-            HiveGenericUDTF udtf =
-                    new HiveGenericUDTF(new HiveFunctionWrapper<>(functionClassName), hiveShim);
-
-            return new TableFunctionDefinition(name, udtf, GenericTypeInfo.of(Row.class));
+            return new HiveGenericUDTF(new HiveFunctionWrapper<>(functionClassName), hiveShim);
         } else if (GenericUDAFResolver2.class.isAssignableFrom(clazz)
                 || UDAF.class.isAssignableFrom(clazz)) {
             HiveGenericUDAF udaf;
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
index 88b9e717b99..0128cea1c9c 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/util/HiveReflectionUtils.java
@@ -67,10 +67,10 @@ public class HiveReflectionUtils {
         }
     }
 
-    public static ObjectInspector createConstantObjectInspector(String className, Object value) {
+    public static ObjectInspector createConstantObjectInspector(
+            String className, Class<?> valueClz, Object value) {
         try {
-            Constructor<?> method =
-                    Class.forName(className).getDeclaredConstructor(value.getClass());
+            Constructor<?> method = Class.forName(className).getDeclaredConstructor(valueClz);
             method.setAccessible(true);
             return (ObjectInspector) method.newInstance(value);
         } catch (ClassNotFoundException
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java
index 8804f5e62d2..c51a3c78370 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java
@@ -19,38 +19,104 @@
 package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.ArgumentCount;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.ConstantArgumentCount;
+import org.apache.flink.table.types.inference.InputTypeStrategy;
+import org.apache.flink.table.types.inference.Signature;
+import org.apache.flink.table.types.inference.TypeInference;
+import org.apache.flink.table.types.inference.TypeStrategy;
 
-/**
- * Interface for Hive simple udf, generic udf, and generic udtf. TODO: Note: this is only a
- * temporary interface for workaround when Flink type system and udf system rework is not finished.
- * Should adapt to Flink type system and Flink UDF framework later on.
- */
+import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** Interface for Hive UDF, UDTF, UDAF. */
 @Internal
-public interface HiveFunction {
+public interface HiveFunction<UDFType> {
 
-    /**
-     * Set arguments and argTypes for Function instance. In this way, the correct method can be
-     * really deduced by the function instance.
-     *
-     * @param constantArguments arguments of a function call (only literal arguments are passed,
-     *     nulls for non-literal ones)
-     * @param argTypes types of arguments
-     */
-    void setArgumentTypesAndConstants(Object[] constantArguments, DataType[] argTypes);
+    /** Sets input arguments for the function. */
+    void setArguments(CallContext callContext);
 
     /**
-     * Get result type by arguments and argTypes.
-     *
-     * <p>We can't use getResultType(Object[], Class[]). The Class[] is the classes of what is
-     * defined in eval(), for example, if eval() is "public Integer eval(Double)", the argTypes
-     * would be Class[Double]. However, in our wrapper, the signature of eval() is "public Object
-     * eval(Object... args)", which means we cannot get any info from the interface.
+     * Infers the return type of the function. This method should be called after {@link
+     * #setArguments(CallContext)} is called.
      *
-     * @param constantArguments arguments of a function call (only literal arguments are passed,
-     *     nulls for non-literal ones)
-     * @param argTypes types of arguments
-     * @return result type.
+     * @return The inferred return type.
+     * @throws UDFArgumentException can be thrown if the input arguments are invalid.
      */
-    DataType getHiveResultType(Object[] constantArguments, DataType[] argTypes);
+    DataType inferReturnType() throws UDFArgumentException;
+
+    /** Gets the wrapper for the Hive function. */
+    HiveFunctionWrapper<UDFType> getFunctionWrapper();
+
+    /** Creates {@link TypeInference} for the function. */
+    default TypeInference createTypeInference() {
+        TypeInference.Builder builder = TypeInference.newBuilder();
+        builder.inputTypeStrategy(new HiveFunctionInputStrategy(this));
+        builder.outputTypeStrategy(new HiveFunctionOutputStrategy(this));
+        return builder.build();
+    }
+
+    /** InputTypeStrategy for Hive UDF, UDTF, UDAF. */
+    class HiveFunctionInputStrategy implements InputTypeStrategy {
+
+        private final HiveFunction<?> hiveFunction;
+
+        public HiveFunctionInputStrategy(HiveFunction<?> hiveFunction) {
+            this.hiveFunction = hiveFunction;
+        }
+
+        @Override
+        public ArgumentCount getArgumentCount() {
+            return ConstantArgumentCount.any();
+        }
+
+        @Override
+        public Optional<List<DataType>> inferInputTypes(
+                CallContext callContext, boolean throwOnFailure) {
+            hiveFunction.setArguments(callContext);
+            try {
+                hiveFunction.inferReturnType();
+            } catch (UDFArgumentException e) {
+                if (throwOnFailure) {
+                    throw callContext.newValidationError(
+                            "Cannot find a suitable Hive function from %s for the input arguments",
+                            hiveFunction.getFunctionWrapper().getClassName());
+                } else {
+                    return Optional.empty();
+                }
+            }
+            return Optional.of(callContext.getArgumentDataTypes());
+        }
+
+        @Override
+        public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
+            return Collections.singletonList(Signature.of(Signature.Argument.of("*")));
+        }
+    }
+
+    /** OutputTypeStrategy for Hive UDF, UDTF, UDAF. */
+    class HiveFunctionOutputStrategy implements TypeStrategy {
+
+        private final HiveFunction<?> hiveFunction;
+
+        public HiveFunctionOutputStrategy(HiveFunction<?> hiveFunction) {
+            this.hiveFunction = hiveFunction;
+        }
+
+        @Override
+        public Optional<DataType> inferType(CallContext callContext) {
+            hiveFunction.setArguments(callContext);
+            try {
+                return Optional.of(hiveFunction.inferReturnType());
+            } catch (UDFArgumentException e) {
+                throw new FlinkHiveUDFException(e);
+            }
+        }
+    }
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionArguments.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionArguments.java
new file mode 100644
index 00000000000..6484591623f
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionArguments.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+
+import java.io.Serializable;
+import java.util.BitSet;
+
+/** Stores arguments information for a Hive function . */
+public class HiveFunctionArguments implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    // input arguments -- store the value if an argument is literal, null otherwise
+    private final Object[] args;
+    // date type of each argument
+    private final DataType[] argTypes;
+    // store the indices of literal arguments, so that we can support null literals
+    private final BitSet literalIndices;
+
+    private HiveFunctionArguments(Object[] args, DataType[] argTypes, BitSet literalIndices) {
+        this.args = args;
+        this.argTypes = argTypes;
+        this.literalIndices = literalIndices;
+    }
+
+    public int size() {
+        return args.length;
+    }
+
+    public boolean isLiteral(int pos) {
+        return pos >= 0 && pos < args.length && literalIndices.get(pos);
+    }
+
+    public Object getArg(int pos) {
+        return args[pos];
+    }
+
+    public DataType getDataType(int pos) {
+        return argTypes[pos];
+    }
+
+    // create from a CallContext
+    public static HiveFunctionArguments create(CallContext callContext) {
+        DataType[] argTypes = callContext.getArgumentDataTypes().toArray(new DataType[0]);
+        Object[] args = new Object[argTypes.length];
+        BitSet literalIndices = new BitSet(args.length);
+        for (int i = 0; i < args.length; i++) {
+            if (callContext.isArgumentLiteral(i)) {
+                literalIndices.set(i);
+                args[i] =
+                        callContext
+                                .getArgumentValue(
+                                        i, argTypes[i].getLogicalType().getDefaultConversion())
+                                .orElse(null);
+                // we always use string type for string constant arg because that's what hive UDFs
+                // expect.
+                // it may happen that the type is char when call the function
+                // in Flink SQL for calcite treat string literal as char type.
+                if (args[i] instanceof String) {
+                    argTypes[i] = DataTypes.STRING();
+                }
+            }
+        }
+        return new HiveFunctionArguments(args, argTypes, literalIndices);
+    }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
index 99eb3b79c2e..0be3b3b1fb6 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -48,7 +48,7 @@ import java.util.Arrays;
 @Internal
 public class HiveGenericUDAF
         extends AggregateFunction<Object, GenericUDAFEvaluator.AggregationBuffer>
-        implements HiveFunction {
+        implements HiveLegacyFunction {
 
     private final HiveFunctionWrapper hiveFunctionWrapper;
     // Flag that indicates whether a bridge between GenericUDAF and UDAF is required.
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
index a23a2539e3b..cca3a9ce4b8 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
@@ -40,8 +40,9 @@ public class HiveGenericUDF extends HiveScalarFunction<GenericUDF> {
 
     private static final Logger LOG = LoggerFactory.getLogger(HiveGenericUDF.class);
 
+    private final HiveShim hiveShim;
+
     private transient GenericUDF.DeferredObject[] deferredObjects;
-    private HiveShim hiveShim;
 
     public HiveGenericUDF(HiveFunctionWrapper<GenericUDF> hiveFunctionWrapper, HiveShim hiveShim) {
         super(hiveFunctionWrapper);
@@ -56,8 +57,7 @@ public class HiveGenericUDF extends HiveScalarFunction<GenericUDF> {
 
         function = createFunction();
 
-        ObjectInspector[] argInspectors =
-                HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes);
+        ObjectInspector[] argInspectors = HiveInspectors.getArgInspectors(hiveShim, arguments);
 
         try {
             returnInspector = function.initializeAndFoldConstants(argInspectors);
@@ -65,12 +65,12 @@ public class HiveGenericUDF extends HiveScalarFunction<GenericUDF> {
             throw new FlinkHiveUDFException(e);
         }
 
-        deferredObjects = new GenericUDF.DeferredObject[argTypes.length];
+        deferredObjects = new GenericUDF.DeferredObject[arguments.size()];
 
         for (int i = 0; i < deferredObjects.length; i++) {
             deferredObjects[i] =
                     new DeferredObjectAdapter(
-                            argInspectors[i], argTypes[i].getLogicalType(), hiveShim);
+                            argInspectors[i], arguments.getDataType(i).getLogicalType(), hiveShim);
         }
     }
 
@@ -93,12 +93,11 @@ public class HiveGenericUDF extends HiveScalarFunction<GenericUDF> {
     }
 
     @Override
-    protected DataType inferReturnType() throws UDFArgumentException {
+    public DataType inferReturnType() throws UDFArgumentException {
         LOG.info(
                 "Getting result type of HiveGenericUDF from {}",
                 hiveFunctionWrapper.getClassName());
-        ObjectInspector[] argumentInspectors =
-                HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes);
+        ObjectInspector[] argumentInspectors = HiveInspectors.getArgInspectors(hiveShim, arguments);
 
         ObjectInspector resultObjectInspector =
                 createFunction().initializeAndFoldConstants(argumentInspectors);
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
index 2251fea6c01..616ff096dec 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
@@ -20,7 +20,7 @@ package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.functions.FunctionContext;
@@ -29,8 +29,9 @@ import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
 import org.apache.flink.table.functions.hive.conversion.HiveObjectConversion;
 import org.apache.flink.table.functions.hive.conversion.IdentityConversion;
 import org.apache.flink.table.functions.hive.util.HiveFunctionUtil;
-import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
@@ -48,13 +49,13 @@ import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** A TableFunction implementation that calls Hive's {@link GenericUDTF}. */
 @Internal
-public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction {
+public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction<GenericUDTF> {
     private static final Logger LOG = LoggerFactory.getLogger(HiveGenericUDTF.class);
 
     private final HiveFunctionWrapper<GenericUDTF> hiveFunctionWrapper;
+    private final HiveShim hiveShim;
 
-    private Object[] constantArguments;
-    private DataType[] argTypes;
+    private HiveFunctionArguments arguments;
 
     private transient GenericUDTF function;
     private transient StructObjectInspector returnInspector;
@@ -62,7 +63,6 @@ public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction
 
     private transient boolean allIdentityConverter;
     private transient HiveObjectConversion[] conversions;
-    private HiveShim hiveShim;
 
     public HiveGenericUDTF(
             HiveFunctionWrapper<GenericUDTF> hiveFunctionWrapper, HiveShim hiveShim) {
@@ -80,17 +80,18 @@ public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction
                     HiveGenericUDTF.this.collect(row);
                 });
 
-        ObjectInspector[] argumentInspectors =
-                HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes);
+        ObjectInspector[] argumentInspectors = HiveInspectors.getArgInspectors(hiveShim, arguments);
         returnInspector = function.initialize(argumentInspectors);
 
-        isArgsSingleArray = HiveFunctionUtil.isSingleBoxedArray(argTypes);
+        isArgsSingleArray = HiveFunctionUtil.isSingleBoxedArray(arguments);
 
         conversions = new HiveObjectConversion[argumentInspectors.length];
         for (int i = 0; i < argumentInspectors.length; i++) {
             conversions[i] =
                     HiveInspectors.getConversion(
-                            argumentInspectors[i], argTypes[i].getLogicalType(), hiveShim);
+                            argumentInspectors[i],
+                            arguments.getDataType(i).getLogicalType(),
+                            hiveShim);
         }
 
         allIdentityConverter =
@@ -126,35 +127,32 @@ public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction
     }
 
     @Override
-    public void setArgumentTypesAndConstants(Object[] constantArguments, DataType[] argTypes) {
-        this.constantArguments = constantArguments;
-        this.argTypes = argTypes;
+    public void close() throws Exception {
+        function.close();
     }
 
     @Override
-    public DataType getHiveResultType(Object[] constantArguments, DataType[] argTypes) {
-        LOG.info(
-                "Getting result type of HiveGenericUDTF with {}",
-                hiveFunctionWrapper.getClassName());
+    public TypeInference getTypeInference(DataTypeFactory typeFactory) {
+        return createTypeInference();
+    }
 
-        try {
-            ObjectInspector[] argumentInspectors =
-                    HiveInspectors.toInspectors(hiveShim, constantArguments, argTypes);
-            return HiveTypeUtil.toFlinkType(
-                    hiveFunctionWrapper.createFunction().initialize(argumentInspectors));
-        } catch (UDFArgumentException e) {
-            throw new FlinkHiveUDFException(e);
-        }
+    @Override
+    public void setArguments(CallContext callContext) {
+        arguments = HiveFunctionArguments.create(callContext);
     }
 
     @Override
-    public TypeInformation getResultType() {
-        return TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo(
-                getHiveResultType(this.constantArguments, this.argTypes).getLogicalType());
+    public DataType inferReturnType() throws UDFArgumentException {
+        LOG.info(
+                "Getting result type of HiveGenericUDTF with {}",
+                hiveFunctionWrapper.getClassName());
+        ObjectInspector[] argumentInspectors = HiveInspectors.getArgInspectors(hiveShim, arguments);
+        return HiveTypeUtil.toFlinkType(
+                hiveFunctionWrapper.createFunction().initialize(argumentInspectors));
     }
 
     @Override
-    public void close() throws Exception {
-        function.close();
+    public HiveFunctionWrapper<GenericUDTF> getFunctionWrapper() {
+        return hiveFunctionWrapper;
     }
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveLegacyFunction.java
similarity index 96%
copy from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java
copy to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveLegacyFunction.java
index 8804f5e62d2..77759a65422 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveLegacyFunction.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.table.functions.hive;
 
-import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.types.DataType;
 
 /**
@@ -26,8 +25,7 @@ import org.apache.flink.table.types.DataType;
  * temporary interface for workaround when Flink type system and udf system rework is not finished.
  * Should adapt to Flink type system and Flink UDF framework later on.
  */
-@Internal
-public interface HiveFunction {
+public interface HiveLegacyFunction {
 
     /**
      * Set arguments and argTypes for Function instance. In this way, the correct method can be
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java
index 084089e6acd..0620910f9da 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java
@@ -19,41 +19,27 @@
 package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.functions.FunctionContext;
-import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.hive.util.HiveFunctionUtil;
-import org.apache.flink.table.runtime.types.ClassLogicalTypeConverter;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.ArgumentCount;
 import org.apache.flink.table.types.inference.CallContext;
-import org.apache.flink.table.types.inference.ConstantArgumentCount;
-import org.apache.flink.table.types.inference.InputTypeStrategy;
-import org.apache.flink.table.types.inference.Signature;
 import org.apache.flink.table.types.inference.TypeInference;
-import org.apache.flink.table.types.inference.TypeStrategy;
 
 import org.apache.hadoop.hive.ql.exec.UDF;
-import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-
 /**
  * Abstract class to provide more information for Hive {@link UDF} and {@link GenericUDF} functions.
  */
 @Internal
-public abstract class HiveScalarFunction<UDFType> extends ScalarFunction {
+public abstract class HiveScalarFunction<UDFType> extends ScalarFunction
+        implements HiveFunction<UDFType> {
 
     protected final HiveFunctionWrapper<UDFType> hiveFunctionWrapper;
 
-    protected Object[] constantArguments;
-    protected DataType[] argTypes;
+    protected HiveFunctionArguments arguments;
 
     protected transient UDFType function;
     protected transient ObjectInspector returnInspector;
@@ -82,15 +68,12 @@ public abstract class HiveScalarFunction<UDFType> extends ScalarFunction {
     public void open(FunctionContext context) {
         openInternal();
 
-        isArgsSingleArray = HiveFunctionUtil.isSingleBoxedArray(argTypes);
+        isArgsSingleArray = HiveFunctionUtil.isSingleBoxedArray(arguments);
     }
 
     @Override
     public TypeInference getTypeInference(DataTypeFactory typeFactory) {
-        TypeInference.Builder builder = TypeInference.newBuilder();
-        builder.inputTypeStrategy(new HiveUDFInputStrategy());
-        builder.outputTypeStrategy(new HiveUDFOutputStrategy());
-        return builder.build();
+        return createTypeInference();
     }
 
     /** See {@link ScalarFunction#open(FunctionContext)}. */
@@ -113,70 +96,13 @@ public abstract class HiveScalarFunction<UDFType> extends ScalarFunction {
     /** Evaluation logical, args will be wrapped when is a single array. */
     protected abstract Object evalInternal(Object[] args);
 
-    private void setArguments(CallContext callContext) {
-        DataType[] inputTypes = callContext.getArgumentDataTypes().toArray(new DataType[0]);
-        Object[] constantArgs = new Object[inputTypes.length];
-        for (int i = 0; i < constantArgs.length; i++) {
-            if (callContext.isArgumentLiteral(i)) {
-                constantArgs[i] =
-                        callContext
-                                .getArgumentValue(
-                                        i,
-                                        ClassLogicalTypeConverter.getDefaultExternalClassForType(
-                                                inputTypes[i].getLogicalType()))
-                                .orElse(null);
-            }
-        }
-        this.constantArguments = constantArgs;
-        this.argTypes = inputTypes;
-    }
-
-    /** Infer return type of this function call. */
-    protected abstract DataType inferReturnType() throws UDFArgumentException;
-
-    private class HiveUDFOutputStrategy implements TypeStrategy {
-
-        @Override
-        public Optional<DataType> inferType(CallContext callContext) {
-            setArguments(callContext);
-            try {
-                return Optional.of(inferReturnType());
-            } catch (UDFArgumentException e) {
-                throw new FlinkHiveUDFException(e);
-            }
-        }
+    @Override
+    public void setArguments(CallContext callContext) {
+        arguments = HiveFunctionArguments.create(callContext);
     }
 
-    private class HiveUDFInputStrategy implements InputTypeStrategy {
-
-        @Override
-        public ArgumentCount getArgumentCount() {
-            return ConstantArgumentCount.any();
-        }
-
-        @Override
-        public Optional<List<DataType>> inferInputTypes(
-                CallContext callContext, boolean throwOnFailure) {
-            setArguments(callContext);
-            try {
-                inferReturnType();
-            } catch (UDFArgumentException e) {
-                if (throwOnFailure) {
-                    throw new ValidationException(
-                            String.format(
-                                    "Cannot find a suitable Hive function from %s for the input arguments",
-                                    hiveFunctionWrapper.getClassName()),
-                            e);
-                } else {
-                    return Optional.empty();
-                }
-            }
-            return Optional.of(callContext.getArgumentDataTypes());
-        }
-
-        @Override
-        public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
-            return Collections.singletonList(Signature.of(Signature.Argument.of("*")));
-        }
+    @Override
+    public HiveFunctionWrapper<UDFType> getFunctionWrapper() {
+        return hiveFunctionWrapper;
     }
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
index a4427317b97..23adb64f043 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
@@ -51,11 +51,12 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
 
     private static final Logger LOG = LoggerFactory.getLogger(HiveSimpleUDF.class);
 
+    private final HiveShim hiveShim;
+
     private transient Method method;
     private transient GenericUDFUtils.ConversionHelper conversionHelper;
     private transient HiveObjectConversion[] conversions;
     private transient boolean allIdentityConverter;
-    private HiveShim hiveShim;
 
     public HiveSimpleUDF(HiveFunctionWrapper<UDF> hiveFunctionWrapper, HiveShim hiveShim) {
         super(hiveFunctionWrapper);
@@ -71,8 +72,8 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
 
         List<TypeInfo> typeInfos = new ArrayList<>();
 
-        for (DataType arg : argTypes) {
-            typeInfos.add(HiveTypeUtil.toHiveTypeInfo(arg, false));
+        for (int i = 0; i < arguments.size(); i++) {
+            typeInfos.add(HiveTypeUtil.toHiveTypeInfo(arguments.getDataType(i), false));
         }
 
         try {
@@ -83,7 +84,7 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
                             ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
             ObjectInspector[] argInspectors = new ObjectInspector[typeInfos.size()];
 
-            for (int i = 0; i < argTypes.length; i++) {
+            for (int i = 0; i < arguments.size(); i++) {
                 argInspectors[i] =
                         TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(typeInfos.get(i));
             }
@@ -93,7 +94,9 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
             for (int i = 0; i < argInspectors.length; i++) {
                 conversions[i] =
                         HiveInspectors.getConversion(
-                                argInspectors[i], argTypes[i].getLogicalType(), hiveShim);
+                                argInspectors[i],
+                                arguments.getDataType(i).getLogicalType(),
+                                hiveShim);
             }
 
             allIdentityConverter =
@@ -128,10 +131,10 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
     }
 
     @Override
-    protected DataType inferReturnType() throws UDFArgumentException {
+    public DataType inferReturnType() throws UDFArgumentException {
         List<TypeInfo> argTypeInfo = new ArrayList<>();
-        for (DataType argType : argTypes) {
-            argTypeInfo.add(HiveTypeUtil.toHiveTypeInfo(argType, false));
+        for (int i = 0; i < arguments.size(); i++) {
+            argTypeInfo.add(HiveTypeUtil.toHiveTypeInfo(arguments.getDataType(i), false));
         }
 
         Method evalMethod =
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 8169175dccd..0dfd2908a5b 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.functions.hive.HiveFunctionArguments;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.CharType;
@@ -36,6 +37,9 @@ import org.apache.flink.types.Row;
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -83,8 +87,16 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
 
-import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.lang.reflect.Array;
 import java.lang.reflect.Constructor;
@@ -104,6 +116,34 @@ import java.util.Map;
 @Internal
 public class HiveInspectors {
 
+    /** Get object inspector for each function argument. */
+    public static ObjectInspector[] getArgInspectors(
+            HiveShim hiveShim, HiveFunctionArguments arguments) {
+        ObjectInspector[] inspectors = new ObjectInspector[arguments.size()];
+        for (int i = 0; i < inspectors.length; i++) {
+            if (arguments.isLiteral(i)) {
+                Object constant = arguments.getArg(i);
+                PrimitiveTypeInfo primitiveTypeInfo =
+                        (PrimitiveTypeInfo)
+                                HiveTypeUtil.toHiveTypeInfo(arguments.getDataType(i), false);
+                constant =
+                        getConversion(
+                                        getObjectInspector(primitiveTypeInfo),
+                                        arguments.getDataType(i).getLogicalType(),
+                                        hiveShim)
+                                .toHiveObject(constant);
+                inspectors[i] =
+                        getObjectInspectorForPrimitiveConstant(
+                                primitiveTypeInfo, constant, hiveShim);
+            } else {
+                inspectors[i] =
+                        TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(
+                                HiveTypeUtil.toHiveTypeInfo(arguments.getDataType(i), false));
+            }
+        }
+        return inspectors;
+    }
+
     /** Get an array of ObjectInspector from the give array of args and their types. */
     public static ObjectInspector[] toInspectors(
             HiveShim hiveShim, Object[] args, DataType[] argTypes) {
@@ -391,39 +431,47 @@ public class HiveInspectors {
     }
 
     private static ObjectInspector getObjectInspectorForPrimitiveConstant(
-            PrimitiveTypeInfo primitiveTypeInfo, @Nonnull Object value, HiveShim hiveShim) {
+            PrimitiveTypeInfo primitiveTypeInfo, @Nullable Object value, HiveShim hiveShim) {
         String className;
         value = hiveShim.hivePrimitiveToWritable(value);
         switch (primitiveTypeInfo.getPrimitiveCategory()) {
             case BOOLEAN:
                 className = WritableConstantBooleanObjectInspector.class.getName();
-                return HiveReflectionUtils.createConstantObjectInspector(className, value);
+                return HiveReflectionUtils.createConstantObjectInspector(
+                        className, BooleanWritable.class, value);
             case BYTE:
                 className = WritableConstantByteObjectInspector.class.getName();
-                return HiveReflectionUtils.createConstantObjectInspector(className, value);
+                return HiveReflectionUtils.createConstantObjectInspector(
+                        className, ByteWritable.class, value);
             case SHORT:
                 className = WritableConstantShortObjectInspector.class.getName();
-                return HiveReflectionUtils.createConstantObjectInspector(className, value);
+                return HiveReflectionUtils.createConstantObjectInspector(
+                        className, ShortWritable.class, value);
             case INT:
                 className = WritableConstantIntObjectInspector.class.getName();
-                return HiveReflectionUtils.createConstantObjectInspector(className, value);
+                return HiveReflectionUtils.createConstantObjectInspector(
+                        className, IntWritable.class, value);
             case LONG:
                 className = WritableConstantLongObjectInspector.class.getName();
-                return HiveReflectionUtils.createConstantObjectInspector(className, value);
+                return HiveReflectionUtils.createConstantObjectInspector(
+                        className, LongWritable.class, value);
             case FLOAT:
                 className = WritableConstantFloatObjectInspector.class.getName();
-                return HiveReflectionUtils.createConstantObjectInspector(className, value);
+                return HiveReflectionUtils.createConstantObjectInspector(
+                        className, FloatWritable.class, value);
             case DOUBLE:
                 className = WritableConstantDoubleObjectInspector.class.getName();
-                return HiveReflectionUtils.createConstantObjectInspector(className, value);
+                return HiveReflectionUtils.createConstantObjectInspector(
+                        className, DoubleWritable.class, value);
             case STRING:
                 className = WritableConstantStringObjectInspector.class.getName();
-                return HiveReflectionUtils.createConstantObjectInspector(className, value);
+                return HiveReflectionUtils.createConstantObjectInspector(
+                        className, Text.class, value);
             case CHAR:
                 try {
                     Constructor<WritableConstantHiveCharObjectInspector> constructor =
                             WritableConstantHiveCharObjectInspector.class.getDeclaredConstructor(
-                                    CharTypeInfo.class, value.getClass());
+                                    CharTypeInfo.class, HiveCharWritable.class);
                     constructor.setAccessible(true);
                     return constructor.newInstance(primitiveTypeInfo, value);
                 } catch (Exception e) {
@@ -434,7 +482,7 @@ public class HiveInspectors {
                 try {
                     Constructor<WritableConstantHiveVarcharObjectInspector> constructor =
                             WritableConstantHiveVarcharObjectInspector.class.getDeclaredConstructor(
-                                    VarcharTypeInfo.class, value.getClass());
+                                    VarcharTypeInfo.class, HiveVarcharWritable.class);
                     constructor.setAccessible(true);
                     return constructor.newInstance(primitiveTypeInfo, value);
                 } catch (Exception e) {
@@ -443,15 +491,17 @@ public class HiveInspectors {
                 }
             case DATE:
                 className = WritableConstantDateObjectInspector.class.getName();
-                return HiveReflectionUtils.createConstantObjectInspector(className, value);
+                return HiveReflectionUtils.createConstantObjectInspector(
+                        className, hiveShim.getDateWritableClass(), value);
             case TIMESTAMP:
                 className = WritableConstantTimestampObjectInspector.class.getName();
-                return HiveReflectionUtils.createConstantObjectInspector(className, value);
+                return HiveReflectionUtils.createConstantObjectInspector(
+                        className, hiveShim.getTimestampWritableClass(), value);
             case DECIMAL:
                 try {
                     Constructor<WritableConstantHiveDecimalObjectInspector> constructor =
                             WritableConstantHiveDecimalObjectInspector.class.getDeclaredConstructor(
-                                    DecimalTypeInfo.class, value.getClass());
+                                    DecimalTypeInfo.class, HiveDecimalWritable.class);
                     constructor.setAccessible(true);
                     return constructor.newInstance(primitiveTypeInfo, value);
                 } catch (Exception e) {
@@ -460,13 +510,14 @@ public class HiveInspectors {
                 }
             case BINARY:
                 className = WritableConstantBinaryObjectInspector.class.getName();
-                return HiveReflectionUtils.createConstantObjectInspector(className, value);
+                return HiveReflectionUtils.createConstantObjectInspector(
+                        className, ByteWritable.class, value);
             case UNKNOWN:
             case VOID:
                 // If type is null, we use the Constant String to replace
                 className = WritableConstantStringObjectInspector.class.getName();
                 return HiveReflectionUtils.createConstantObjectInspector(
-                        className, value.toString());
+                        className, Text.class, value == null ? null : value.toString());
             default:
                 throw new FlinkHiveUDFException(
                         String.format(
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/util/HiveFunctionUtil.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/util/HiveFunctionUtil.java
index 824e8250769..681334a78cd 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/util/HiveFunctionUtil.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/util/HiveFunctionUtil.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.functions.hive.util;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.functions.hive.FlinkHiveUDFException;
+import org.apache.flink.table.functions.hive.HiveFunctionArguments;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.ArrayType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -28,15 +29,14 @@ import org.apache.flink.table.types.logical.LogicalTypeRoot;
 /** Util for Hive functions. */
 @Internal
 public class HiveFunctionUtil {
-    public static boolean isSingleBoxedArray(DataType[] argTypes) {
-        for (DataType dataType : argTypes) {
-            if (HiveFunctionUtil.isPrimitiveArray(dataType)) {
+    public static boolean isSingleBoxedArray(HiveFunctionArguments arguments) {
+        for (int i = 0; i < arguments.size(); i++) {
+            if (HiveFunctionUtil.isPrimitiveArray(arguments.getDataType(i))) {
                 throw new FlinkHiveUDFException(
-                        "Flink doesn't support primitive array for Hive functions yet.");
+                        "Flink doesn't support primitive array for Hive function yet.");
             }
         }
-
-        return argTypes.length == 1 && HiveFunctionUtil.isArrayType(argTypes[0]);
+        return arguments.size() == 1 && HiveFunctionUtil.isArrayType(arguments.getDataType(0));
     }
 
     private static boolean isPrimitiveArray(DataType dataType) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
index 4411b088be3..60f5cda3283 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserCalcitePlanner.java
@@ -50,6 +50,7 @@ import org.apache.flink.table.planner.delegation.hive.copy.HiveParserWindowingSp
 import org.apache.flink.table.planner.delegation.hive.parse.HiveASTParser;
 import org.apache.flink.table.planner.delegation.hive.parse.HiveParserCreateViewInfo;
 import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.plan.FlinkCalciteCatalogReader;
 import org.apache.flink.table.planner.plan.nodes.hive.LogicalDistribution;
 import org.apache.flink.table.types.DataType;
@@ -98,7 +99,6 @@ import org.apache.calcite.sql.SqlAggFunction;
 import org.apache.calcite.sql.SqlOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.SqlTypeName;
-import org.apache.calcite.sql.validate.SqlUserDefinedTableFunction;
 import org.apache.calcite.sql2rel.DeduplicateCorrelateVariables;
 import org.apache.calcite.tools.FrameworkConfig;
 import org.apache.calcite.util.CompositeList;
@@ -2539,9 +2539,9 @@ public class HiveParserCalcitePlanner {
 
         SqlOperator convertedOperator = convertedCall.getOperator();
         Preconditions.checkState(
-                convertedOperator instanceof SqlUserDefinedTableFunction,
+                convertedOperator instanceof BridgingSqlFunction,
                 "Expect operator to be "
-                        + SqlUserDefinedTableFunction.class.getSimpleName()
+                        + BridgingSqlFunction.class.getSimpleName()
                         + ", actually got "
                         + convertedOperator.getClass().getSimpleName());
 
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java
index 073afb2d956..ae682206abe 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserUtils.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.catalog.hive.util.HiveReflectionUtils;
 import org.apache.flink.table.catalog.hive.util.HiveTypeUtil;
 import org.apache.flink.table.functions.FunctionKind;
 import org.apache.flink.table.functions.hive.HiveGenericUDAF;
-import org.apache.flink.table.functions.hive.HiveGenericUDTF;
 import org.apache.flink.table.module.hive.udf.generic.GenericUDFLegacyGroupingID;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.delegation.hive.copy.HiveASTParseDriver;
@@ -48,7 +47,6 @@ import org.apache.flink.table.planner.delegation.hive.parse.HiveParserCreateView
 import org.apache.flink.table.planner.delegation.hive.parse.HiveParserErrorMsg;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.functions.utils.HiveAggSqlFunction;
-import org.apache.flink.table.planner.functions.utils.HiveTableSqlFunction;
 import org.apache.flink.table.runtime.types.ClassLogicalTypeConverter;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -891,18 +889,6 @@ public class HiveParserUtils {
                 || sqlOperator instanceof HiveAggSqlFunction) {
             SqlReturnTypeInference returnTypeInference = sqlOperator.getReturnTypeInference();
             return returnTypeInference.inferReturnType(operatorBinding);
-        } else if (sqlOperator instanceof HiveTableSqlFunction) {
-            HiveGenericUDTF hiveGenericUDTF =
-                    (HiveGenericUDTF)
-                            ((HiveTableSqlFunction) sqlOperator)
-                                    .makeFunction(new Object[0], new LogicalType[0]);
-            DataType dataType =
-                    hiveGenericUDTF.getHiveResultType(
-                            operatorBinding.getConstantOperands(),
-                            types.stream()
-                                    .map(HiveParserUtils::toDataType)
-                                    .toArray(DataType[]::new));
-            return toRelDataType(dataType, dataTypeFactory);
         } else {
             throw new FlinkHiveException(
                     "Unsupported SqlOperator class " + sqlOperator.getClass().getName());
@@ -918,14 +904,6 @@ public class HiveParserUtils {
                 dataTypeFactory);
     }
 
-    public static RelDataType toRelDataType(DataType dataType, RelDataTypeFactory dtFactory) {
-        try {
-            return toRelDataType(HiveTypeUtil.toHiveTypeInfo(dataType, false), dtFactory);
-        } catch (SemanticException e) {
-            throw new FlinkHiveException(e);
-        }
-    }
-
     public static DataType toDataType(RelDataType relDataType) {
         return HiveTypeUtil.toFlinkType(HiveParserTypeConverter.convert(relDataType));
     }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserTypeConverter.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserTypeConverter.java
index 56ca2b52ffd..caa8df47a21 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserTypeConverter.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/copy/HiveParserTypeConverter.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
@@ -68,7 +69,12 @@ public class HiveParserTypeConverter {
 
         for (ColumnInfo ci : rs.getSignature()) {
             if (neededCols == null || neededCols.contains(ci.getInternalName())) {
-                fieldTypes.add(convert(ci.getType(), dtFactory));
+                RelDataType relDataType = convert(ci.getType(), dtFactory);
+                // if the type is struct, we set it nullable
+                if (ci.getType().getCategory() == ObjectInspector.Category.STRUCT) {
+                    relDataType = dtFactory.createTypeWithNullability(relDataType, true);
+                }
+                fieldTypes.add(relDataType);
                 fieldNames.add(ci.getInternalName());
             }
         }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
index 8ea26b0d0e9..0df52e6b191 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/TableEnvHiveConnectorITCase.java
@@ -261,6 +261,8 @@ public class TableEnvHiveConnectorITCase {
             tableEnv.executeSql("create table db1.nested (a array<map<int, string>>)");
             tableEnv.executeSql(
                     "create function hiveudtf as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode'");
+            tableEnv.executeSql(
+                    "create function json_tuple as 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFJSONTuple'");
             HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "simple")
                     .addRow(new Object[] {3, Arrays.asList(1, 2, 3)})
                     .commit();
@@ -287,7 +289,14 @@ public class TableEnvHiveConnectorITCase {
                                     .execute()
                                     .collect());
             assertThat(results.toString()).isEqualTo("[+I[{1=a, 2=b}], +I[{3=c}]]");
-
+            results =
+                    CollectionUtil.iteratorToList(
+                            tableEnv.sqlQuery(
+                                            "select foo.i, b.role_id from db1.simple foo,"
+                                                    + " lateral table(json_tuple('{\"a\": \"0\", \"b\": \"1\"}', 'a')) as b(role_id)")
+                                    .execute()
+                                    .collect());
+            assertThat(results.toString()).isEqualTo("[+I[3, 0]]");
             tableEnv.executeSql("create table db1.ts (a array<timestamp>)");
             HiveTestUtils.createTextTableInserter(hiveCatalog, "db1", "ts")
                     .addRow(
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
index cabc275dee5..1b1dd03c884 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
@@ -21,11 +21,10 @@ package org.apache.flink.table.functions.hive;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.functions.hive.HiveSimpleUDFTest.HiveUDFCallContext;
 import org.apache.flink.table.functions.hive.util.TestGenericUDFArray;
 import org.apache.flink.table.functions.hive.util.TestGenericUDFStructSize;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.utils.CallContextMock;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.hive.ql.udf.UDFUnhex;
@@ -45,7 +44,11 @@ import java.lang.reflect.InvocationTargetException;
 import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static org.apache.flink.table.HiveVersionTestUtil.HIVE_230_OR_LATER;
 import static org.apache.flink.table.HiveVersionTestUtil.HIVE_310_OR_LATER;
@@ -53,7 +56,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link HiveGenericUDF}. */
 public class HiveGenericUDFTest {
-    private static HiveShim hiveShim = HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion());
+    private static final HiveShim hiveShim =
+            HiveShimLoader.loadHiveShim(HiveShimLoader.getHiveVersion());
 
     @Test
     public void testAbs() {
@@ -287,7 +291,12 @@ public class HiveGenericUDFTest {
         HiveGenericUDF udf =
                 new HiveGenericUDF(new HiveFunctionWrapper(hiveUdfClass.getName()), hiveShim);
 
-        CallContext callContext = new HiveUDFCallContext(constantArgs, argTypes);
+        CallContextMock callContext = new CallContextMock();
+        callContext.argumentDataTypes = Arrays.asList(argTypes);
+        callContext.argumentValues =
+                Arrays.stream(constantArgs).map(Optional::ofNullable).collect(Collectors.toList());
+        callContext.argumentLiterals =
+                Arrays.stream(constantArgs).map(Objects::nonNull).collect(Collectors.toList());
         udf.getTypeInference(null).getOutputTypeStrategy().inferType(callContext);
 
         udf.open(null);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java
index 2819fadff76..b82fd4b5f9e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.functions.hive.conversion.HiveInspectors;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.utils.CallContextMock;
 import org.apache.flink.types.Row;
 
 import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
@@ -43,6 +44,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -147,13 +151,20 @@ public class HiveGenericUDTFTest {
             Class hiveUdfClass, Object[] constantArgs, DataType[] argTypes) throws Exception {
         HiveFunctionWrapper<GenericUDTF> wrapper = new HiveFunctionWrapper(hiveUdfClass.getName());
 
-        HiveGenericUDTF udf = new HiveGenericUDTF(wrapper, hiveShim);
+        CallContextMock callContext = new CallContextMock();
+        callContext.argumentDataTypes = Arrays.asList(argTypes);
+        callContext.argumentValues =
+                Arrays.stream(constantArgs).map(Optional::ofNullable).collect(Collectors.toList());
+        callContext.argumentLiterals =
+                Arrays.stream(constantArgs).map(Objects::nonNull).collect(Collectors.toList());
 
-        udf.setArgumentTypesAndConstants(constantArgs, argTypes);
-        udf.getHiveResultType(constantArgs, argTypes);
+        HiveGenericUDTF udf = new HiveGenericUDTF(wrapper, hiveShim);
+        udf.setArguments(callContext);
+        udf.inferReturnType();
 
         ObjectInspector[] argumentInspectors =
-                HiveInspectors.toInspectors(hiveShim, constantArgs, argTypes);
+                HiveInspectors.getArgInspectors(
+                        hiveShim, HiveFunctionArguments.create(callContext));
         ObjectInspector returnInspector = wrapper.createFunction().initialize(argumentInspectors);
 
         udf.open(null);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
index a0d74203673..5a4e864b5b9 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
@@ -19,13 +19,11 @@
 package org.apache.flink.table.functions.hive;
 
 import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
-import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.hive.util.TestHiveUDFArray;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.utils.CallContextMock;
 
 import org.apache.hadoop.hive.ql.exec.UDF;
 import org.apache.hadoop.hive.ql.udf.UDFBase64;
@@ -44,8 +42,7 @@ import java.math.BigDecimal;
 import java.sql.Date;
 import java.sql.Timestamp;
 import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
+import java.util.Collections;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -222,7 +219,10 @@ public class HiveSimpleUDFTest {
                 new HiveSimpleUDF(new HiveFunctionWrapper(hiveUdfClass.getName()), hiveShim);
 
         // Hive UDF won't have literal args
-        CallContext callContext = new HiveUDFCallContext(new Object[0], argTypes);
+        CallContextMock callContext = new CallContextMock();
+        callContext.argumentDataTypes = Arrays.asList(argTypes);
+        callContext.argumentLiterals = Arrays.asList(new Boolean[argTypes.length]);
+        Collections.fill(callContext.argumentLiterals, false);
         udf.getTypeInference(null).getOutputTypeStrategy().inferType(callContext);
 
         udf.open(null);
@@ -257,61 +257,4 @@ public class HiveSimpleUDFTest {
             return content;
         }
     }
-
-    /** A CallContext implementation for Hive UDF tests. */
-    public static class HiveUDFCallContext implements CallContext {
-
-        private final Object[] constantArgs;
-        private final DataType[] argTypes;
-
-        public HiveUDFCallContext(Object[] constantArgs, DataType[] argTypes) {
-            this.constantArgs = constantArgs;
-            this.argTypes = argTypes;
-        }
-
-        @Override
-        public DataTypeFactory getDataTypeFactory() {
-            return null;
-        }
-
-        @Override
-        public FunctionDefinition getFunctionDefinition() {
-            return null;
-        }
-
-        @Override
-        public boolean isArgumentLiteral(int pos) {
-            return pos >= 0 && pos < constantArgs.length && constantArgs[pos] != null;
-        }
-
-        @Override
-        public boolean isArgumentNull(int pos) {
-            return false;
-        }
-
-        @Override
-        public <T> Optional<T> getArgumentValue(int pos, Class<T> clazz) {
-            return (Optional<T>) Optional.of(constantArgs[pos]);
-        }
-
-        @Override
-        public String getName() {
-            return null;
-        }
-
-        @Override
-        public List<DataType> getArgumentDataTypes() {
-            return Arrays.asList(argTypes);
-        }
-
-        @Override
-        public Optional<DataType> getOutputDataType() {
-            return Optional.empty();
-        }
-
-        @Override
-        public boolean isGroupedAggregation() {
-            return false;
-        }
-    }
 }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
index 6567c22702d..f7933d9dc5d 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java
@@ -23,10 +23,9 @@ import org.apache.flink.table.catalog.hive.HiveTestUtils;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.hive.HiveSimpleUDF;
-import org.apache.flink.table.functions.hive.HiveSimpleUDFTest.HiveUDFCallContext;
 import org.apache.flink.table.module.CoreModule;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.table.types.inference.utils.CallContextMock;
 import org.apache.flink.table.utils.LegacyRowResource;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
@@ -34,6 +33,8 @@ import org.apache.flink.util.CollectionUtil;
 import org.junit.Rule;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.apache.flink.table.catalog.hive.client.HiveShimLoader.HIVE_VERSION_V2_3_9;
@@ -90,7 +91,10 @@ public class HiveModuleTest {
 
         DataType[] inputType = new DataType[] {DataTypes.STRING()};
 
-        CallContext callContext = new HiveUDFCallContext(new Object[0], inputType);
+        CallContextMock callContext = new CallContextMock();
+        callContext.argumentDataTypes = Arrays.asList(inputType);
+        callContext.argumentLiterals = Arrays.asList(new Boolean[inputType.length]);
+        Collections.fill(callContext.argumentLiterals, false);
         udf.getTypeInference(null).getOutputTypeStrategy().inferType(callContext);
 
         udf.open(null);
@@ -137,8 +141,6 @@ public class HiveModuleTest {
                                 .collect());
         assertThat(results.toString()).isEqualTo("[2018-01-192019-12-27 17:58:23.385]");
 
-        // TODO: null cannot be a constant argument at the moment. This test will make more sense
-        // when that changes.
         results =
                 CollectionUtil.iteratorToList(
                         tEnv.sqlQuery("select concat('ab',cast(null as int))").execute().collect());
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
index e0e638ba99e..6e8ed1a874b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/catalog/FunctionCatalogOperatorTable.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.planner.catalog;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.catalog.ContextResolvedFunction;
 import org.apache.flink.table.catalog.DataTypeFactory;
@@ -37,14 +36,10 @@ import org.apache.flink.table.planner.calcite.RexFactory;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
 import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
 import org.apache.flink.table.planner.functions.utils.HiveAggSqlFunction;
-import org.apache.flink.table.planner.functions.utils.HiveTableSqlFunction;
 import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
-import org.apache.flink.table.planner.plan.schema.DeferredTypeFlinkTableFunction;
-import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.TypeInference;
 import org.apache.flink.table.types.inference.TypeStrategies;
 import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
 
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
@@ -61,7 +56,6 @@ import java.util.List;
 import java.util.Optional;
 
 import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.isHiveFunc;
-import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /** Thin adapter between {@link SqlOperatorTable} and {@link FunctionCatalog}. */
 @Internal
@@ -123,22 +117,7 @@ public class FunctionCatalogOperatorTable implements SqlOperatorTable {
         } else if (definition instanceof TableFunctionDefinition
                 && category != null
                 && category.isTableFunction()) {
-            TableFunctionDefinition def = (TableFunctionDefinition) definition;
-            if (isHiveFunc(def.getTableFunction())) {
-                DataType returnType = fromLegacyInfoToDataType(new GenericTypeInfo<>(Row.class));
-                return Optional.of(
-                        new HiveTableSqlFunction(
-                                identifier,
-                                def.getTableFunction(),
-                                returnType,
-                                typeFactory,
-                                new DeferredTypeFlinkTableFunction(
-                                        def.getTableFunction(), returnType),
-                                HiveTableSqlFunction.operandTypeChecker(
-                                        identifier.toString(), def.getTableFunction())));
-            } else {
-                return convertTableFunction(identifier, (TableFunctionDefinition) definition);
-            }
+            return convertTableFunction(identifier, (TableFunctionDefinition) definition);
         }
         // new stack
         return convertToBridgingSqlFunction(category, resolvedFunction);
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java
deleted file mode 100644
index 984c3aa7123..00000000000
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/utils/HiveTableSqlFunction.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.planner.functions.utils;
-
-import org.apache.flink.table.functions.FunctionIdentifier;
-import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
-import org.apache.flink.table.planner.plan.schema.FlinkTableFunction;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.utils.DateTimeUtils;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.calcite.rel.type.RelDataTypeFactory;
-import org.apache.calcite.sql.SqlCallBinding;
-import org.apache.calcite.util.BitString;
-import org.apache.calcite.util.DateString;
-import org.apache.calcite.util.NlsString;
-import org.apache.calcite.util.Pair;
-import org.apache.calcite.util.TimeString;
-import org.apache.calcite.util.TimestampString;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.time.LocalTime;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Collectors;
-
-import scala.Tuple3;
-
-import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeGetResultType;
-import static org.apache.flink.table.planner.functions.utils.HiveFunctionUtils.invokeSetArgs;
-import static org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.buildRelDataType;
-import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
-import static org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromLogicalTypeToDataType;
-
-/**
- * Hive {@link TableSqlFunction}. Override getFunction to clone function and invoke {@code
- * HiveGenericUDTF#setArgumentTypesAndConstants}. Override SqlReturnTypeInference to invoke {@code
- * HiveGenericUDTF#getHiveResultType} instead of {@code HiveGenericUDTF#getResultType(Object[],
- * Class[])}.
- *
- * @deprecated TODO hack code, its logical should be integrated to TableSqlFunction
- */
-@Deprecated
-public class HiveTableSqlFunction extends TableSqlFunction {
-
-    private final TableFunction hiveUdtf;
-    private final HiveOperandTypeChecker operandTypeChecker;
-
-    public HiveTableSqlFunction(
-            FunctionIdentifier identifier,
-            TableFunction hiveUdtf,
-            DataType implicitResultType,
-            FlinkTypeFactory typeFactory,
-            FlinkTableFunction functionImpl,
-            HiveOperandTypeChecker operandTypeChecker) {
-        super(
-                identifier,
-                identifier.toString(),
-                hiveUdtf,
-                implicitResultType,
-                typeFactory,
-                functionImpl,
-                scala.Option.apply(operandTypeChecker));
-        this.hiveUdtf = hiveUdtf;
-        this.operandTypeChecker = operandTypeChecker;
-    }
-
-    @Override
-    public TableFunction makeFunction(Object[] constantArguments, LogicalType[] argTypes) {
-        TableFunction clone;
-        try {
-            clone = InstantiationUtil.clone(hiveUdtf);
-        } catch (IOException | ClassNotFoundException e) {
-            throw new RuntimeException(e);
-        }
-        return (TableFunction) invokeSetArgs(clone, constantArguments, argTypes);
-    }
-
-    @Override
-    public RelDataType getRowType(RelDataTypeFactory typeFactory, List<Object> arguments) {
-        Preconditions.checkNotNull(operandTypeChecker.previousArgTypes);
-        FlinkTypeFactory factory = (FlinkTypeFactory) typeFactory;
-        Object[] argumentsArray =
-                convertArguments(
-                        Arrays.stream(operandTypeChecker.previousArgTypes)
-                                .map(factory::createFieldTypeFromLogicalType)
-                                .collect(Collectors.toList()),
-                        arguments);
-        DataType resultType =
-                fromLogicalTypeToDataType(
-                        FlinkTypeFactory.toLogicalType(
-                                invokeGetResultType(
-                                        hiveUdtf,
-                                        argumentsArray,
-                                        operandTypeChecker.previousArgTypes,
-                                        (FlinkTypeFactory) typeFactory)));
-        Tuple3<String[], int[], LogicalType[]> fieldInfo =
-                UserDefinedFunctionUtils.getFieldInfo(resultType);
-        return buildRelDataType(
-                typeFactory, fromDataTypeToLogicalType(resultType), fieldInfo._1(), fieldInfo._2());
-    }
-
-    /**
-     * This method is copied from calcite, and modify it to not rely on Function. TODO
-     * FlinkTableFunction need implement getElementType.
-     */
-    private static Object[] convertArguments(
-            List<RelDataType> operandTypes, List<Object> arguments0) {
-        List<Object> arguments = new ArrayList<>(arguments0.size());
-        // Construct a list of arguments, if they are all constants.
-        for (Pair<RelDataType, Object> pair : Pair.zip(operandTypes, arguments0)) {
-            arguments.add(coerce(pair.right, pair.left));
-        }
-        return arguments.toArray();
-    }
-
-    private static Object coerce(Object value, RelDataType type) {
-        if (value == null) {
-            return null;
-        }
-        switch (type.getSqlTypeName()) {
-            case CHAR:
-                return ((NlsString) value).getValue();
-            case BINARY:
-                return ((BitString) value).getAsByteArray();
-            case DECIMAL:
-                return value;
-            case BIGINT:
-                return ((BigDecimal) value).longValue();
-            case INTEGER:
-                return ((BigDecimal) value).intValue();
-            case SMALLINT:
-                return ((BigDecimal) value).shortValue();
-            case TINYINT:
-                return ((BigDecimal) value).byteValue();
-            case DOUBLE:
-                return ((BigDecimal) value).doubleValue();
-            case REAL:
-                return ((BigDecimal) value).floatValue();
-            case FLOAT:
-                return ((BigDecimal) value).floatValue();
-            case DATE:
-                return LocalDate.ofEpochDay(((DateString) value).getDaysSinceEpoch());
-            case TIME:
-                return LocalTime.ofNanoOfDay(((TimeString) value).getMillisOfDay() * 1000_000);
-            case TIMESTAMP:
-                return DateTimeUtils.toLocalDateTime(
-                        ((TimestampString) value).getMillisSinceEpoch());
-            default:
-                throw new RuntimeException("Not support type: " + type);
-        }
-    }
-
-    public static HiveOperandTypeChecker operandTypeChecker(String name, TableFunction udtf) {
-        return new HiveOperandTypeChecker(
-                name, udtf, UserDefinedFunctionUtils.checkAndExtractMethods(udtf, "eval"));
-    }
-
-    /**
-     * Checker for remember previousArgTypes.
-     *
-     * @deprecated TODO hack code, should modify calcite getRowType to pass operand types
-     */
-    @Deprecated
-    public static class HiveOperandTypeChecker extends OperandMetadata {
-
-        private LogicalType[] previousArgTypes;
-
-        private HiveOperandTypeChecker(String name, TableFunction udtf, Method[] methods) {
-            super(name, udtf, methods);
-        }
-
-        @Override
-        public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFailure) {
-            previousArgTypes = UserDefinedFunctionUtils.getOperandTypeArray(callBinding);
-            return super.checkOperandTypes(callBinding, throwOnFailure);
-        }
-    }
-}