You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/07/23 04:43:43 UTC

[flink] branch master updated (fc533b9d9c1 -> 9a4fd227a58)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from fc533b9d9c1 [FLINK-26047][yarn] Support remote usrlib in HDFS for YARN deployment
     new e13ae26e806 [FLINK-28451][table][hive] Use UserCodeClassloader instead of the current thread's classloader to load function
     new 9a4fd227a58 [FLINK-28451][hive] Borrow Kryo from SerializationUtilities in HiveFunctionWrapper

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../factories/HiveFunctionDefinitionFactory.java   | 66 ++++++++++-----
 .../flink/table/functions/hive/HiveFunction.java   |  2 +-
 .../table/functions/hive/HiveFunctionWrapper.java  | 97 ++++++++++++++--------
 .../table/functions/hive/HiveGenericUDAF.java      |  7 +-
 .../flink/table/functions/hive/HiveGenericUDF.java |  6 +-
 .../table/functions/hive/HiveGenericUDTF.java      |  2 +-
 .../table/functions/hive/HiveScalarFunction.java   | 16 ++--
 .../flink/table/functions/hive/HiveSimpleUDF.java  |  6 +-
 .../apache/flink/table/module/hive/HiveModule.java | 18 ++--
 .../flink/table/module/hive/HiveModuleFactory.java |  2 +-
 .../table/planner/delegation/hive/HiveParser.java  |  3 +-
 .../hive/parse/HiveParserDDLSemanticAnalyzer.java  | 11 ++-
 .../functions/hive/HiveFunctionWrapperTest.java    | 91 ++++++++++++++++++++
 .../table/functions/hive/HiveGenericUDAFTest.java  |  3 +-
 .../table/functions/hive/HiveGenericUDFTest.java   |  5 +-
 .../table/functions/hive/HiveGenericUDTFTest.java  |  4 +-
 .../table/functions/hive/HiveSimpleUDFTest.java    |  5 +-
 .../client/gateway/context/ExecutionContext.java   | 11 ---
 .../table/client/gateway/local/LocalExecutor.java  | 11 +--
 .../flink/table/catalog/FunctionCatalog.java       |  7 +-
 .../table/factories/FunctionDefinitionFactory.java | 38 ++++++++-
 .../factories/TestFunctionDefinitionFactory.java}  | 27 +++---
 .../table/planner/factories/TestValuesCatalog.java |  8 +-
 .../table/planner/catalog/CatalogTableITCase.scala | 40 ++++++++-
 24 files changed, 353 insertions(+), 133 deletions(-)
 create mode 100644 flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveFunctionWrapperTest.java
 copy flink-table/{flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java => flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFunctionDefinitionFactory.java} (54%)


[flink] 02/02: [FLINK-28451][hive] Borrow Kryo from SerializationUtilities in HiveFunctionWrapper

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9a4fd227a5824e707f4dbe52773a260fbb89854a
Author: Jark Wu <ja...@apache.org>
AuthorDate: Fri Jul 22 18:43:23 2022 +0800

    [FLINK-28451][hive] Borrow Kryo from SerializationUtilities in HiveFunctionWrapper
    
    This closes #20211
---
 .../factories/HiveFunctionDefinitionFactory.java   |  4 +-
 .../table/functions/hive/HiveFunctionWrapper.java  | 56 ++++++++++++++--------
 .../apache/flink/table/module/hive/HiveModule.java |  2 +-
 .../client/gateway/context/ExecutionContext.java   |  2 -
 4 files changed, 40 insertions(+), 24 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
index 62458186f60..d3cb9f39763 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
@@ -71,9 +71,9 @@ public class HiveFunctionDefinitionFactory implements FunctionDefinitionFactory
     }
 
     /**
-     * Distinguish if the function is a generic function.
+     * Distinguish if the function is a Flink function.
      *
-     * @return whether the function is a generic function
+     * @return whether the function is a Flink function
      */
     private boolean isFlinkFunction(CatalogFunction catalogFunction, ClassLoader classLoader) {
         if (catalogFunction.getFunctionLanguage() == FunctionLanguage.PYTHON) {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java
index eebd1e2153f..9c2d8053018 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java
@@ -30,6 +30,9 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Serializable;
 
+import static org.apache.hadoop.hive.ql.exec.SerializationUtilities.borrowKryo;
+import static org.apache.hadoop.hive.ql.exec.SerializationUtilities.releaseKryo;
+
 /**
  * A wrapper of Hive functions that instantiate function instances and ser/de function instance
  * cross process boundary.
@@ -50,6 +53,7 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
 
     private transient UDFType instance = null;
 
+    @SuppressWarnings("unchecked")
     public HiveFunctionWrapper(Class<?> functionClz) {
         this.functionClz = (Class<UDFType>) functionClz;
     }
@@ -75,6 +79,38 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
         this.udfSerializedBytes = serializeObjectToKryo((Serializable) serializableInstance);
     }
 
+    private static byte[] serializeObjectToKryo(Serializable object) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        Output output = new Output(baos);
+        Kryo kryo = borrowKryo();
+        try {
+            kryo.writeObject(output, object);
+        } finally {
+            releaseKryo(kryo);
+        }
+        output.close();
+        return baos.toByteArray();
+    }
+
+    private static <T extends Serializable> T deserializeObjectFromKryo(
+            byte[] bytes, Class<T> clazz) {
+        Input inp = new Input(new ByteArrayInputStream(bytes));
+        Kryo kryo = borrowKryo();
+        ClassLoader oldClassLoader = kryo.getClassLoader();
+        kryo.setClassLoader(clazz.getClassLoader());
+        T func;
+
+        try {
+            func = kryo.readObject(inp, clazz);
+        } finally {
+            kryo.setClassLoader(oldClassLoader);
+            releaseKryo(kryo);
+        }
+
+        inp.close();
+        return func;
+    }
+
     /**
      * Instantiate a Hive function instance.
      *
@@ -124,27 +160,9 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
      *
      * @return the UDF deserialized
      */
+    @SuppressWarnings("unchecked")
     private UDFType deserializeUDF() {
         return (UDFType)
                 deserializeObjectFromKryo(udfSerializedBytes, (Class<Serializable>) getUDFClass());
     }
-
-    private static byte[] serializeObjectToKryo(Serializable object) {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        Output output = new Output(baos);
-        Kryo kryo = new Kryo();
-        kryo.writeObject(output, object);
-        output.close();
-        return baos.toByteArray();
-    }
-
-    private static <T extends Serializable> T deserializeObjectFromKryo(
-            byte[] bytes, Class<T> clazz) {
-        Input inp = new Input(new ByteArrayInputStream(bytes));
-        Kryo kryo = new Kryo();
-        kryo.setClassLoader(clazz.getClassLoader());
-        T func = kryo.readObject(inp, clazz);
-        inp.close();
-        return func;
-    }
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
index 562e053f1c8..f798a8a4543 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
@@ -140,7 +140,7 @@ public class HiveModule implements Module {
         if (name.equalsIgnoreCase("internal_interval")) {
             return Optional.of(
                     factory.createFunctionDefinitionFromHiveFunction(
-                            name, HiveGenericUDFInternalInterval.class.getName()));
+                            name, HiveGenericUDFInternalInterval.class.getName(), context));
         }
 
         Optional<FunctionInfo> info = hiveShim.getBuiltInFunctionInfo(name);
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
index 67b796de25b..c2bf0b1d725 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
@@ -36,10 +36,8 @@ import org.apache.flink.table.factories.PlannerFactoryUtil;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.util.MutableURLClassLoader;
-import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import java.lang.reflect.Method;
-import java.util.function.Supplier;
 
 import static org.apache.flink.table.client.gateway.context.SessionContext.SessionState;
 


[flink] 01/02: [FLINK-28451][table][hive] Use UserCodeClassloader instead of the current thread's classloader to load function

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e13ae26e806eff403787d5b75e90008bcc13d8dc
Author: luoyuxia <lu...@alumni.sjtu.edu.cn>
AuthorDate: Fri Jul 8 11:42:32 2022 +0800

    [FLINK-28451][table][hive] Use UserCodeClassloader instead of the current thread's classloader to load function
    
    This closes #20211
---
 .../factories/HiveFunctionDefinitionFactory.java   | 66 ++++++++++------
 .../flink/table/functions/hive/HiveFunction.java   |  2 +-
 .../table/functions/hive/HiveFunctionWrapper.java  | 79 +++++++++++--------
 .../table/functions/hive/HiveGenericUDAF.java      |  7 +-
 .../flink/table/functions/hive/HiveGenericUDF.java |  6 +-
 .../table/functions/hive/HiveGenericUDTF.java      |  2 +-
 .../table/functions/hive/HiveScalarFunction.java   | 16 ++--
 .../flink/table/functions/hive/HiveSimpleUDF.java  |  6 +-
 .../apache/flink/table/module/hive/HiveModule.java | 16 +++-
 .../flink/table/module/hive/HiveModuleFactory.java |  2 +-
 .../table/planner/delegation/hive/HiveParser.java  |  3 +-
 .../hive/parse/HiveParserDDLSemanticAnalyzer.java  | 11 ++-
 .../functions/hive/HiveFunctionWrapperTest.java    | 91 ++++++++++++++++++++++
 .../table/functions/hive/HiveGenericUDAFTest.java  |  3 +-
 .../table/functions/hive/HiveGenericUDFTest.java   |  5 +-
 .../table/functions/hive/HiveGenericUDTFTest.java  |  4 +-
 .../table/functions/hive/HiveSimpleUDFTest.java    |  5 +-
 .../client/gateway/context/ExecutionContext.java   |  9 ---
 .../table/client/gateway/local/LocalExecutor.java  | 11 +--
 .../flink/table/catalog/FunctionCatalog.java       |  7 +-
 .../table/factories/FunctionDefinitionFactory.java | 38 ++++++++-
 .../factories/TestFunctionDefinitionFactory.java}  | 27 +++----
 .../table/planner/factories/TestValuesCatalog.java |  8 +-
 .../table/planner/catalog/CatalogTableITCase.scala | 40 +++++++++-
 24 files changed, 334 insertions(+), 130 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
index 934f77902bd..62458186f60 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/factories/HiveFunctionDefinitionFactory.java
@@ -21,9 +21,11 @@ package org.apache.flink.table.catalog.hive.factories;
 import org.apache.flink.connectors.hive.HiveTableFactory;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.functions.hive.HiveFunctionWrapper;
 import org.apache.flink.table.functions.hive.HiveGenericUDAF;
@@ -54,17 +56,39 @@ public class HiveFunctionDefinitionFactory implements FunctionDefinitionFactory
 
     @Override
     public FunctionDefinition createFunctionDefinition(
-            String name, CatalogFunction catalogFunction) {
-        if (catalogFunction.isGeneric()) {
-            return createFunctionDefinitionFromFlinkFunction(name, catalogFunction);
+            String name, CatalogFunction catalogFunction, Context context) {
+        if (isFlinkFunction(catalogFunction, context.getClassLoader())) {
+            return createFunctionDefinitionFromFlinkFunction(name, catalogFunction, context);
         }
-        return createFunctionDefinitionFromHiveFunction(name, catalogFunction.getClassName());
+        return createFunctionDefinitionFromHiveFunction(
+                name, catalogFunction.getClassName(), context);
     }
 
     public FunctionDefinition createFunctionDefinitionFromFlinkFunction(
-            String name, CatalogFunction catalogFunction) {
+            String name, CatalogFunction catalogFunction, Context context) {
         return UserDefinedFunctionHelper.instantiateFunction(
-                Thread.currentThread().getContextClassLoader(), null, name, catalogFunction);
+                context.getClassLoader(), null, name, catalogFunction);
+    }
+
+    /**
+     * Distinguish if the function is a generic function.
+     *
+     * @return whether the function is a generic function
+     */
+    private boolean isFlinkFunction(CatalogFunction catalogFunction, ClassLoader classLoader) {
+        if (catalogFunction.getFunctionLanguage() == FunctionLanguage.PYTHON) {
+            return true;
+        }
+        try {
+            Class<?> c = Class.forName(catalogFunction.getClassName(), true, classLoader);
+            if (UserDefinedFunction.class.isAssignableFrom(c)) {
+                return true;
+            }
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException(
+                    String.format("Can't resolve udf class %s", catalogFunction.getClassName()), e);
+        }
+        return false;
     }
 
     /**
@@ -72,10 +96,10 @@ public class HiveFunctionDefinitionFactory implements FunctionDefinitionFactory
      * org.apache.flink.table.module.hive.HiveModule}.
      */
     public FunctionDefinition createFunctionDefinitionFromHiveFunction(
-            String name, String functionClassName) {
-        Class<?> clazz;
+            String name, String functionClassName, Context context) {
+        Class<?> functionClz;
         try {
-            clazz = Thread.currentThread().getContextClassLoader().loadClass(functionClassName);
+            functionClz = context.getClassLoader().loadClass(functionClassName);
 
             LOG.info("Successfully loaded Hive udf '{}' with class '{}'", name, functionClassName);
         } catch (ClassNotFoundException e) {
@@ -84,33 +108,31 @@ public class HiveFunctionDefinitionFactory implements FunctionDefinitionFactory
                     e);
         }
 
-        if (UDF.class.isAssignableFrom(clazz)) {
+        if (UDF.class.isAssignableFrom(functionClz)) {
             LOG.info("Transforming Hive function '{}' into a HiveSimpleUDF", name);
 
-            return new HiveSimpleUDF(new HiveFunctionWrapper<>(functionClassName), hiveShim);
-        } else if (GenericUDF.class.isAssignableFrom(clazz)) {
+            return new HiveSimpleUDF(new HiveFunctionWrapper<>(functionClz), hiveShim);
+        } else if (GenericUDF.class.isAssignableFrom(functionClz)) {
             LOG.info("Transforming Hive function '{}' into a HiveGenericUDF", name);
 
-            return new HiveGenericUDF(new HiveFunctionWrapper<>(functionClassName), hiveShim);
-        } else if (GenericUDTF.class.isAssignableFrom(clazz)) {
+            return new HiveGenericUDF(new HiveFunctionWrapper<>(functionClz), hiveShim);
+        } else if (GenericUDTF.class.isAssignableFrom(functionClz)) {
             LOG.info("Transforming Hive function '{}' into a HiveGenericUDTF", name);
-            return new HiveGenericUDTF(new HiveFunctionWrapper<>(functionClassName), hiveShim);
-        } else if (GenericUDAFResolver2.class.isAssignableFrom(clazz)
-                || UDAF.class.isAssignableFrom(clazz)) {
+            return new HiveGenericUDTF(new HiveFunctionWrapper<>(functionClz), hiveShim);
+        } else if (GenericUDAFResolver2.class.isAssignableFrom(functionClz)
+                || UDAF.class.isAssignableFrom(functionClz)) {
 
-            if (GenericUDAFResolver2.class.isAssignableFrom(clazz)) {
+            if (GenericUDAFResolver2.class.isAssignableFrom(functionClz)) {
                 LOG.info(
                         "Transforming Hive function '{}' into a HiveGenericUDAF without UDAF bridging",
                         name);
-                return new HiveGenericUDAF(
-                        new HiveFunctionWrapper<>(functionClassName), false, hiveShim);
+                return new HiveGenericUDAF(new HiveFunctionWrapper<>(functionClz), false, hiveShim);
             } else {
                 LOG.info(
                         "Transforming Hive function '{}' into a HiveGenericUDAF with UDAF bridging",
                         name);
 
-                return new HiveGenericUDAF(
-                        new HiveFunctionWrapper<>(functionClassName), true, hiveShim);
+                return new HiveGenericUDAF(new HiveFunctionWrapper<>(functionClz), true, hiveShim);
             }
         } else {
             throw new IllegalArgumentException(
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java
index c51a3c78370..c37f9e6ac2a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunction.java
@@ -86,7 +86,7 @@ public interface HiveFunction<UDFType> {
                 if (throwOnFailure) {
                     throw callContext.newValidationError(
                             "Cannot find a suitable Hive function from %s for the input arguments",
-                            hiveFunction.getFunctionWrapper().getClassName());
+                            hiveFunction.getFunctionWrapper().getUDFClassName());
                 } else {
                     return Optional.empty();
                 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java
index a53d99c2069..eebd1e2153f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveFunctionWrapper.java
@@ -21,9 +21,13 @@ package org.apache.flink.table.functions.hive;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.Preconditions;
 
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hive.com.esotericsoftware.kryo.Kryo;
+import org.apache.hive.com.esotericsoftware.kryo.io.Input;
+import org.apache.hive.com.esotericsoftware.kryo.io.Output;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.Serializable;
 
 /**
@@ -37,30 +41,30 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
 
     public static final long serialVersionUID = 393313529306818205L;
 
-    private final String className;
-    // a field to hold the string serialized for the UDF.
+    private final Class<UDFType> functionClz;
+    // a field to hold the bytes serialized for the UDF.
     // we sometimes need to hold it in case of some serializable UDF will contain
     // additional information such as Hive's GenericUDFMacro and if we construct the UDF directly by
     // getUDFClass#newInstance, the information will be missed.
-    private String udfSerializedString;
+    private byte[] udfSerializedBytes;
 
     private transient UDFType instance = null;
 
-    public HiveFunctionWrapper(String className) {
-        this.className = className;
+    public HiveFunctionWrapper(Class<?> functionClz) {
+        this.functionClz = (Class<UDFType>) functionClz;
     }
 
     /**
      * Create a HiveFunctionWrapper with a UDF instance. In this constructor, the instance will be
      * serialized to string and held on in the HiveFunctionWrapper.
      */
-    public HiveFunctionWrapper(String className, UDFType serializableInstance) {
-        this(className);
+    public HiveFunctionWrapper(Class<?> functionClz, UDFType serializableInstance) {
+        this(functionClz);
         Preconditions.checkArgument(
-                serializableInstance.getClass().getName().equals(className),
+                serializableInstance.getClass().getName().equals(getUDFClassName()),
                 String.format(
                         "Expect the UDF is instance of %s, but is instance of %s.",
-                        className, serializableInstance.getClass().getName()));
+                        getUDFClassName(), serializableInstance.getClass().getName()));
         Preconditions.checkArgument(
                 serializableInstance instanceof Serializable,
                 String.format(
@@ -68,8 +72,7 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
                         serializableInstance.getClass().getName()));
         // we need to use the SerializationUtilities#serializeObject to serialize UDF for the UDF
         // may not be serialized by Java serializer
-        this.udfSerializedString =
-                SerializationUtilities.serializeObject((Serializable) serializableInstance);
+        this.udfSerializedBytes = serializeObjectToKryo((Serializable) serializableInstance);
     }
 
     /**
@@ -78,7 +81,7 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
      * @return a Hive function instance
      */
     public UDFType createFunction() {
-        if (udfSerializedString != null) {
+        if (udfSerializedBytes != null) {
             // deserialize the string to udf instance
             return deserializeUDF();
         } else if (instance != null) {
@@ -86,10 +89,11 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
         } else {
             UDFType func;
             try {
-                func = getUDFClass().newInstance();
-            } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+                func = functionClz.newInstance();
+            } catch (InstantiationException | IllegalAccessException e) {
                 throw new FlinkHiveUDFException(
-                        String.format("Failed to create function from %s", className), e);
+                        String.format("Failed to create function from %s", functionClz.getName()),
+                        e);
             }
 
             if (!(func instanceof UDF)) {
@@ -107,18 +111,12 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
      *
      * @return class name of the Hive function
      */
-    public String getClassName() {
-        return className;
+    public String getUDFClassName() {
+        return functionClz.getName();
     }
 
-    /**
-     * Get class of the Hive function.
-     *
-     * @return class of the Hive function
-     * @throws ClassNotFoundException thrown when the class is not found in classpath
-     */
-    public Class<UDFType> getUDFClass() throws ClassNotFoundException {
-        return (Class<UDFType>) Thread.currentThread().getContextClassLoader().loadClass(className);
+    public Class<UDFType> getUDFClass() {
+        return functionClz;
     }
 
     /**
@@ -127,13 +125,26 @@ public class HiveFunctionWrapper<UDFType> implements Serializable {
      * @return the UDF deserialized
      */
     private UDFType deserializeUDF() {
-        try {
-            return (UDFType)
-                    SerializationUtilities.deserializeObject(
-                            udfSerializedString, (Class<Serializable>) getUDFClass());
-        } catch (ClassNotFoundException e) {
-            throw new FlinkHiveUDFException(
-                    String.format("Failed to deserialize function %s.", className), e);
-        }
+        return (UDFType)
+                deserializeObjectFromKryo(udfSerializedBytes, (Class<Serializable>) getUDFClass());
+    }
+
+    private static byte[] serializeObjectToKryo(Serializable object) {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        Output output = new Output(baos);
+        Kryo kryo = new Kryo();
+        kryo.writeObject(output, object);
+        output.close();
+        return baos.toByteArray();
+    }
+
+    private static <T extends Serializable> T deserializeObjectFromKryo(
+            byte[] bytes, Class<T> clazz) {
+        Input inp = new Input(new ByteArrayInputStream(bytes));
+        Kryo kryo = new Kryo();
+        kryo.setClassLoader(clazz.getClassLoader());
+        T func = kryo.readObject(inp, clazz);
+        inp.close();
+        return func;
     }
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
index df753df152a..c15b5d85255 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDAF.java
@@ -172,7 +172,7 @@ public class HiveGenericUDAF
             throw new FlinkHiveUDFException(
                     String.format(
                             "Failed to create accumulator for %s",
-                            hiveFunctionWrapper.getClassName()),
+                            hiveFunctionWrapper.getUDFClassName()),
                     e);
         }
     }
@@ -206,7 +206,8 @@ public class HiveGenericUDAF
         } catch (HiveException e) {
             throw new FlinkHiveUDFException(
                     String.format(
-                            "Failed to get final result on %s", hiveFunctionWrapper.getClassName()),
+                            "Failed to get final result on %s",
+                            hiveFunctionWrapper.getUDFClassName()),
                     e);
         }
     }
@@ -247,7 +248,7 @@ public class HiveGenericUDAF
             throw new FlinkHiveUDFException(
                     String.format(
                             "Failed to get Hive result type from %s",
-                            hiveFunctionWrapper.getClassName()),
+                            hiveFunctionWrapper.getUDFClassName()),
                     e);
         }
     }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
index cca3a9ce4b8..10483d18024 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java
@@ -47,13 +47,13 @@ public class HiveGenericUDF extends HiveScalarFunction<GenericUDF> {
     public HiveGenericUDF(HiveFunctionWrapper<GenericUDF> hiveFunctionWrapper, HiveShim hiveShim) {
         super(hiveFunctionWrapper);
         this.hiveShim = hiveShim;
-        LOG.info("Creating HiveGenericUDF from '{}'", hiveFunctionWrapper.getClassName());
+        LOG.info("Creating HiveGenericUDF from '{}'", hiveFunctionWrapper.getUDFClassName());
     }
 
     @Override
     public void openInternal() {
 
-        LOG.info("Open HiveGenericUDF as {}", hiveFunctionWrapper.getClassName());
+        LOG.info("Open HiveGenericUDF as {}", hiveFunctionWrapper.getUDFClassName());
 
         function = createFunction();
 
@@ -96,7 +96,7 @@ public class HiveGenericUDF extends HiveScalarFunction<GenericUDF> {
     public DataType inferReturnType() throws UDFArgumentException {
         LOG.info(
                 "Getting result type of HiveGenericUDF from {}",
-                hiveFunctionWrapper.getClassName());
+                hiveFunctionWrapper.getUDFClassName());
         ObjectInspector[] argumentInspectors = HiveInspectors.getArgInspectors(hiveShim, arguments);
 
         ObjectInspector resultObjectInspector =
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
index 616ff096dec..35e35333bab 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDTF.java
@@ -145,7 +145,7 @@ public class HiveGenericUDTF extends TableFunction<Row> implements HiveFunction<
     public DataType inferReturnType() throws UDFArgumentException {
         LOG.info(
                 "Getting result type of HiveGenericUDTF with {}",
-                hiveFunctionWrapper.getClassName());
+                hiveFunctionWrapper.getUDFClassName());
         ObjectInspector[] argumentInspectors = HiveInspectors.getArgInspectors(hiveShim, arguments);
         return HiveTypeUtil.toFlinkType(
                 hiveFunctionWrapper.createFunction().initialize(argumentInspectors));
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java
index 0620910f9da..459575303c2 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveScalarFunction.java
@@ -52,16 +52,12 @@ public abstract class HiveScalarFunction<UDFType> extends ScalarFunction
 
     @Override
     public boolean isDeterministic() {
-        try {
-            org.apache.hadoop.hive.ql.udf.UDFType udfType =
-                    hiveFunctionWrapper
-                            .getUDFClass()
-                            .getAnnotation(org.apache.hadoop.hive.ql.udf.UDFType.class);
-
-            return udfType != null && udfType.deterministic() && !udfType.stateful();
-        } catch (ClassNotFoundException e) {
-            throw new FlinkHiveUDFException(e);
-        }
+        org.apache.hadoop.hive.ql.udf.UDFType udfType =
+                hiveFunctionWrapper
+                        .getUDFClass()
+                        .getAnnotation(org.apache.hadoop.hive.ql.udf.UDFType.class);
+
+        return udfType != null && udfType.deterministic() && !udfType.stateful();
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
index 23adb64f043..daf2684482f 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveSimpleUDF.java
@@ -61,12 +61,12 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
     public HiveSimpleUDF(HiveFunctionWrapper<UDF> hiveFunctionWrapper, HiveShim hiveShim) {
         super(hiveFunctionWrapper);
         this.hiveShim = hiveShim;
-        LOG.info("Creating HiveSimpleUDF from '{}'", this.hiveFunctionWrapper.getClassName());
+        LOG.info("Creating HiveSimpleUDF from '{}'", this.hiveFunctionWrapper.getUDFClassName());
     }
 
     @Override
     public void openInternal() {
-        LOG.info("Opening HiveSimpleUDF as '{}'", hiveFunctionWrapper.getClassName());
+        LOG.info("Opening HiveSimpleUDF as '{}'", hiveFunctionWrapper.getUDFClassName());
 
         function = hiveFunctionWrapper.createFunction();
 
@@ -105,7 +105,7 @@ public class HiveSimpleUDF extends HiveScalarFunction<UDF> {
             throw new FlinkHiveUDFException(
                     String.format(
                             "Failed to open HiveSimpleUDF from %s",
-                            hiveFunctionWrapper.getClassName()),
+                            hiveFunctionWrapper.getUDFClassName()),
                     e);
         }
     }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
index b5297f72f25..562e053f1c8 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModule.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.catalog.hive.client.HiveShim;
 import org.apache.flink.table.catalog.hive.client.HiveShimLoader;
 import org.apache.flink.table.catalog.hive.factories.HiveFunctionDefinitionFactory;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.module.Module;
 import org.apache.flink.table.module.hive.udf.generic.GenericUDFLegacyGroupingID;
@@ -82,12 +83,17 @@ public class HiveModule implements Module {
     private final String hiveVersion;
     private final HiveShim hiveShim;
     private Set<String> functionNames;
+    private final ClassLoader classLoader;
 
     public HiveModule() {
-        this(HiveShimLoader.getHiveVersion());
+        this(HiveShimLoader.getHiveVersion(), Thread.currentThread().getContextClassLoader());
     }
 
     public HiveModule(String hiveVersion) {
+        this(hiveVersion, Thread.currentThread().getContextClassLoader());
+    }
+
+    public HiveModule(String hiveVersion, ClassLoader classLoader) {
         checkArgument(
                 !StringUtils.isNullOrWhitespaceOnly(hiveVersion), "hiveVersion cannot be null");
 
@@ -95,6 +101,7 @@ public class HiveModule implements Module {
         this.hiveShim = HiveShimLoader.loadHiveShim(hiveVersion);
         this.factory = new HiveFunctionDefinitionFactory(hiveShim);
         this.functionNames = new HashSet<>();
+        this.classLoader = classLoader;
     }
 
     @Override
@@ -114,18 +121,19 @@ public class HiveModule implements Module {
         if (BUILT_IN_FUNC_BLACKLIST.contains(name)) {
             return Optional.empty();
         }
+        FunctionDefinitionFactory.Context context = () -> classLoader;
         // We override Hive's grouping function. Refer to the implementation for more details.
         if (name.equalsIgnoreCase("grouping")) {
             return Optional.of(
                     factory.createFunctionDefinitionFromHiveFunction(
-                            name, HiveGenericUDFGrouping.class.getName()));
+                            name, HiveGenericUDFGrouping.class.getName(), context));
         }
 
         // this function is used to generate legacy GROUPING__ID value for old hive versions
         if (name.equalsIgnoreCase(GenericUDFLegacyGroupingID.NAME)) {
             return Optional.of(
                     factory.createFunctionDefinitionFromHiveFunction(
-                            name, GenericUDFLegacyGroupingID.class.getName()));
+                            name, GenericUDFLegacyGroupingID.class.getName(), context));
         }
 
         // We override Hive's internal_interval. Refer to the implementation for more details
@@ -140,7 +148,7 @@ public class HiveModule implements Module {
         return info.map(
                 functionInfo ->
                         factory.createFunctionDefinitionFromHiveFunction(
-                                name, functionInfo.getFunctionClass().getName()));
+                                name, functionInfo.getFunctionClass().getName(), context));
     }
 
     public String getHiveVersion() {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleFactory.java
index 451ab52aaae..7d81e25fe62 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/module/hive/HiveModuleFactory.java
@@ -63,6 +63,6 @@ public class HiveModuleFactory implements ModuleFactory {
                         .getOptional(HIVE_VERSION)
                         .orElseGet(HiveShimLoader::getHiveVersion);
 
-        return new HiveModule(hiveVersion);
+        return new HiveModule(hiveVersion, context.getClassLoader());
     }
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
index af8c14637f1..1a534c61d36 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParser.java
@@ -226,7 +226,8 @@ public class HiveParser extends ParserImpl {
                                 context,
                                 dmlHelper,
                                 frameworkConfig,
-                                plannerContext.getCluster());
+                                plannerContext.getCluster(),
+                                plannerContext.getFlinkContext().getClassLoader());
                 operation = ddlAnalyzer.convertToOperation(node);
                 return Collections.singletonList(operation);
             } else {
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
index 53679d97de8..835fc6b7783 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/parse/HiveParserDDLSemanticAnalyzer.java
@@ -200,6 +200,7 @@ public class HiveParserDDLSemanticAnalyzer {
     private final HiveParserDMLHelper dmlHelper;
     private final FrameworkConfig frameworkConfig;
     private final RelOptCluster cluster;
+    private final ClassLoader classLoader;
 
     static {
         TokenToTypeName.put(HiveASTParser.TOK_BOOLEAN, serdeConstants.BOOLEAN_TYPE_NAME);
@@ -262,7 +263,8 @@ public class HiveParserDDLSemanticAnalyzer {
             HiveParserContext context,
             HiveParserDMLHelper dmlHelper,
             FrameworkConfig frameworkConfig,
-            RelOptCluster cluster)
+            RelOptCluster cluster,
+            ClassLoader classLoader)
             throws SemanticException {
         this.queryState = queryState;
         this.conf = queryState.getConf();
@@ -276,6 +278,7 @@ public class HiveParserDDLSemanticAnalyzer {
         this.dmlHelper = dmlHelper;
         this.frameworkConfig = frameworkConfig;
         this.cluster = cluster;
+        this.classLoader = classLoader;
         reservedPartitionValues = new HashSet<>();
         // Partition can't have this name
         reservedPartitionValues.add(HiveConf.getVar(conf, HiveConf.ConfVars.DEFAULTPARTITIONNAME));
@@ -524,7 +527,8 @@ public class HiveParserDDLSemanticAnalyzer {
             FunctionDefinition funcDefinition =
                     funcDefFactory.createFunctionDefinition(
                             functionName,
-                            new CatalogFunctionImpl(className, FunctionLanguage.JAVA));
+                            new CatalogFunctionImpl(className, FunctionLanguage.JAVA),
+                            () -> classLoader);
             return new CreateTempSystemFunctionOperation(functionName, false, funcDefinition);
         } else {
             ObjectIdentifier identifier = parseObjectIdentifier(functionName);
@@ -558,8 +562,7 @@ public class HiveParserDDLSemanticAnalyzer {
 
         FunctionDefinition macroDefinition =
                 new HiveGenericUDF(
-                        new HiveFunctionWrapper<>(GenericUDFMacro.class.getName(), macro),
-                        hiveShim);
+                        new HiveFunctionWrapper<>(GenericUDFMacro.class, macro), hiveShim);
         // hive's marco is more like flink's temp system function
         return new CreateTempSystemFunctionOperation(macroName, false, macroDefinition);
     }
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveFunctionWrapperTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveFunctionWrapperTest.java
new file mode 100644
index 00000000000..8c993381987
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveFunctionWrapperTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.functions.hive;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFMacro;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link HiveFunctionWrapper}. */
+public class HiveFunctionWrapperTest {
+
+    @TempDir private static File tempFolder;
+
+    private static final Random random = new Random();
+    private static String udfClassName;
+    private static File udfJar;
+
+    @BeforeAll
+    static void before() throws Exception {
+        udfClassName = "MyToLower" + random.nextInt(50);
+        String udfCode =
+                "public class "
+                        + "%s"
+                        + " extends org.apache.flink.table.functions.ScalarFunction {\n"
+                        + "  public String eval(String str) {\n"
+                        + "    return str.toLowerCase();\n"
+                        + "  }\n"
+                        + "}\n";
+        udfJar =
+                UserClassLoaderJarTestUtils.createJarFile(
+                        tempFolder,
+                        "test-classloader-udf.jar",
+                        udfClassName,
+                        String.format(udfCode, udfClassName));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testDeserializeUDF() throws Exception {
+        // test deserialize udf
+        GenericUDFMacro udfMacro = new GenericUDFMacro();
+        HiveFunctionWrapper<GenericUDFMacro> functionWrapper =
+                new HiveFunctionWrapper<>(GenericUDFMacro.class, udfMacro);
+        GenericUDFMacro deserializeUdfMacro = functionWrapper.createFunction();
+        assertThat(deserializeUdfMacro.getClass().getName())
+                .isEqualTo(GenericUDFMacro.class.getName());
+
+        // test deserialize udf loaded by user code class loader instead of current thread class
+        // loader
+        ClassLoader userClassLoader =
+                FlinkUserCodeClassLoaders.create(
+                        new URL[] {udfJar.toURI().toURL()},
+                        getClass().getClassLoader(),
+                        new Configuration());
+        Class<ScalarFunction> udfClass =
+                (Class<ScalarFunction>) userClassLoader.loadClass(udfClassName);
+        ScalarFunction udf = udfClass.newInstance();
+        HiveFunctionWrapper<ScalarFunction> functionWrapper1 =
+                new HiveFunctionWrapper<>(udfClass, udf);
+        ScalarFunction deserializedUdf = functionWrapper1.createFunction();
+        assertThat(deserializedUdf.getClass().getName()).isEqualTo(udfClassName);
+    }
+}
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
index dfd73889033..771d85cef9e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDAFTest.java
@@ -116,8 +116,7 @@ public class HiveGenericUDAFTest {
 
     private static HiveGenericUDAF init(
             Class<?> hiveUdfClass, Object[] constantArgs, DataType[] argTypes) throws Exception {
-        HiveFunctionWrapper<GenericUDAFResolver> wrapper =
-                new HiveFunctionWrapper<>(hiveUdfClass.getName());
+        HiveFunctionWrapper<GenericUDAFResolver> wrapper = new HiveFunctionWrapper<>(hiveUdfClass);
 
         CallContextMock callContext = new CallContextMock();
         callContext.argumentDataTypes = Arrays.asList(argTypes);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
index 7ded892f942..c5162df6087 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
@@ -334,9 +334,8 @@ public class HiveGenericUDFTest {
     }
 
     private static HiveGenericUDF init(
-            Class hiveUdfClass, Object[] constantArgs, DataType[] argTypes) {
-        HiveGenericUDF udf =
-                new HiveGenericUDF(new HiveFunctionWrapper(hiveUdfClass.getName()), hiveShim);
+            Class<?> hiveUdfClass, Object[] constantArgs, DataType[] argTypes) {
+        HiveGenericUDF udf = new HiveGenericUDF(new HiveFunctionWrapper<>(hiveUdfClass), hiveShim);
 
         CallContextMock callContext = new CallContextMock();
         callContext.argumentDataTypes = Arrays.asList(argTypes);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java
index b82fd4b5f9e..3e7235a4e7e 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDTFTest.java
@@ -148,8 +148,8 @@ public class HiveGenericUDTFTest {
     }
 
     private static HiveGenericUDTF init(
-            Class hiveUdfClass, Object[] constantArgs, DataType[] argTypes) throws Exception {
-        HiveFunctionWrapper<GenericUDTF> wrapper = new HiveFunctionWrapper(hiveUdfClass.getName());
+            Class<?> hiveUdfClass, Object[] constantArgs, DataType[] argTypes) throws Exception {
+        HiveFunctionWrapper<GenericUDTF> wrapper = new HiveFunctionWrapper<>(hiveUdfClass);
 
         CallContextMock callContext = new CallContextMock();
         callContext.argumentDataTypes = Arrays.asList(argTypes);
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
index 5a4e864b5b9..b143d2442d9 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveSimpleUDFTest.java
@@ -214,9 +214,8 @@ public class HiveSimpleUDFTest {
         assertThat(udf.eval(5, testInputs, testInputs)).isEqualTo(11);
     }
 
-    protected static HiveSimpleUDF init(Class hiveUdfClass, DataType[] argTypes) {
-        HiveSimpleUDF udf =
-                new HiveSimpleUDF(new HiveFunctionWrapper(hiveUdfClass.getName()), hiveShim);
+    protected static HiveSimpleUDF init(Class<?> hiveUdfClass, DataType[] argTypes) {
+        HiveSimpleUDF udf = new HiveSimpleUDF(new HiveFunctionWrapper<>(hiveUdfClass), hiveShim);
 
         // Hive UDF won't have literal args
         CallContextMock callContext = new CallContextMock();
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
index bc662f56e91..67b796de25b 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
@@ -82,15 +82,6 @@ public class ExecutionContext {
         this.tableEnv = createTableEnvironment();
     }
 
-    /**
-     * Executes the given supplier using the execution context's classloader as thread classloader.
-     */
-    public <R> R wrapClassLoader(Supplier<R> supplier) {
-        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
-            return supplier.get();
-        }
-    }
-
     public StreamTableEnvironment getTableEnvironment() {
         return tableEnv;
     }
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index 1540dec8f65..82446e92dc2 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -169,7 +169,7 @@ public class LocalExecutor implements Executor {
 
         List<Operation> operations;
         try {
-            operations = context.wrapClassLoader(() -> parser.parse(statement));
+            operations = parser.parse(statement);
         } catch (Throwable t) {
             throw new SqlExecutionException("Failed to parse statement: " + statement, t);
         }
@@ -186,10 +186,7 @@ public class LocalExecutor implements Executor {
                 (TableEnvironmentInternal) context.getTableEnvironment();
 
         try {
-            return context.wrapClassLoader(
-                    () ->
-                            Arrays.asList(
-                                    tableEnv.getParser().getCompletionHints(statement, position)));
+            return Arrays.asList(tableEnv.getParser().getCompletionHints(statement, position));
         } catch (Throwable t) {
             // catch everything such that the query does not crash the executor
             if (LOG.isDebugEnabled()) {
@@ -206,7 +203,7 @@ public class LocalExecutor implements Executor {
         final TableEnvironmentInternal tEnv =
                 (TableEnvironmentInternal) context.getTableEnvironment();
         try {
-            return context.wrapClassLoader(() -> tEnv.executeInternal(operation));
+            return tEnv.executeInternal(operation);
         } catch (Throwable t) {
             throw new SqlExecutionException(MESSAGE_SQL_EXECUTION_ERROR, t);
         }
@@ -219,7 +216,7 @@ public class LocalExecutor implements Executor {
         final TableEnvironmentInternal tEnv =
                 (TableEnvironmentInternal) context.getTableEnvironment();
         try {
-            return context.wrapClassLoader(() -> tEnv.executeInternal(operations));
+            return tEnv.executeInternal(operations);
         } catch (Throwable t) {
             throw new SqlExecutionException(MESSAGE_SQL_EXECUTION_ERROR, t);
         }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index 6a6c0c940ff..75a5be4d0ff 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -578,10 +578,15 @@ public final class FunctionCatalog {
                 FunctionDefinition fd;
                 if (catalog.getFunctionDefinitionFactory().isPresent()
                         && catalogFunction.getFunctionLanguage() != FunctionLanguage.PYTHON) {
+                    registerFunctionJarResources(
+                            oi.asSummaryString(), catalogFunction.getFunctionResources());
                     fd =
                             catalog.getFunctionDefinitionFactory()
                                     .get()
-                                    .createFunctionDefinition(oi.getObjectName(), catalogFunction);
+                                    .createFunctionDefinition(
+                                            oi.getObjectName(),
+                                            catalogFunction,
+                                            resourceManager::getUserClassLoader);
                 } else {
                     fd = getFunctionDefinition(oi.asSummaryString(), catalogFunction);
                 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
index 1f91db8f156..7ff642863cf 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.factories;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.util.TemporaryClassLoaderContext;
 
 /** A factory to create {@link FunctionDefinition}. */
 @PublicEvolving
@@ -32,6 +33,41 @@ public interface FunctionDefinitionFactory {
      * @param name name of the {@link CatalogFunction}
      * @param catalogFunction the catalog function
      * @return a {@link FunctionDefinition}
+     * @deprecated Please implement {@link #createFunctionDefinition(String, CatalogFunction,
+     *     Context)} instead.
      */
-    FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction);
+    @Deprecated
+    default FunctionDefinition createFunctionDefinition(
+            String name, CatalogFunction catalogFunction) {
+        throw new RuntimeException(
+                "Please implement FunctionDefinitionFactory#createFunctionDefinition(String, CatalogFunction, Context) instead.");
+    }
+
+    /**
+     * Creates a {@link FunctionDefinition} from given {@link CatalogFunction} with the given {@link
+     * Context} containing the class loader of the current session, which is useful when it's needed
+     * to load class from class name.
+     *
+     * <p>The default implementation will call {@link #createFunctionDefinition(String,
+     * CatalogFunction)} directly.
+     *
+     * @param name name of the {@link CatalogFunction}
+     * @param catalogFunction the catalog function
+     * @param context the {@link Context} for creating function definition
+     * @return a {@link FunctionDefinition}
+     */
+    default FunctionDefinition createFunctionDefinition(
+            String name, CatalogFunction catalogFunction, Context context) {
+        try (TemporaryClassLoaderContext ignored =
+                TemporaryClassLoaderContext.of(context.getClassLoader())) {
+            return createFunctionDefinition(name, catalogFunction);
+        }
+    }
+
+    /** Context provided when a function definition is created. */
+    @PublicEvolving
+    interface Context {
+        /** Returns the class loader of the current session. */
+        ClassLoader getClassLoader();
+    }
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFunctionDefinitionFactory.java
similarity index 54%
copy from flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
copy to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFunctionDefinitionFactory.java
index 1f91db8f156..78b25978cd8 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FunctionDefinitionFactory.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestFunctionDefinitionFactory.java
@@ -16,22 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.factories;
+package org.apache.flink.table.planner.factories;
 
-import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 
-/** A factory to create {@link FunctionDefinition}. */
-@PublicEvolving
-public interface FunctionDefinitionFactory {
+/**
+ * Use TestFunctionDefinitionFactory to test loading function to ensure the function can be loaded
+ * correctly if only implement legacy interface {@link
+ * FunctionDefinitionFactory#createFunctionDefinition(String, CatalogFunction)}.
+ */
+public class TestFunctionDefinitionFactory implements FunctionDefinitionFactory {
 
-    /**
-     * Creates a {@link FunctionDefinition} from given {@link CatalogFunction}.
-     *
-     * @param name name of the {@link CatalogFunction}
-     * @param catalogFunction the catalog function
-     * @return a {@link FunctionDefinition}
-     */
-    FunctionDefinition createFunctionDefinition(String name, CatalogFunction catalogFunction);
+    public FunctionDefinition createFunctionDefinition(
+            String name, CatalogFunction catalogFunction) {
+        return UserDefinedFunctionHelper.instantiateFunction(
+                Thread.currentThread().getContextClassLoader(), null, name, catalogFunction);
+    }
 }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
index 86d9d9b6b69..69ee1955488 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesCatalog.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.factories.FunctionDefinitionFactory;
 import org.apache.flink.table.planner.utils.FilterUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.BooleanType;
@@ -44,7 +45,7 @@ import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-/** Use TestValuesCatalog to test partition push down. */
+/** Use TestValuesCatalog to test partition push down and create function definition. */
 public class TestValuesCatalog extends GenericInMemoryCatalog {
     private final boolean supportListPartitionByFilter;
 
@@ -95,6 +96,11 @@ public class TestValuesCatalog extends GenericInMemoryCatalog {
                 .collect(Collectors.toList());
     }
 
+    @Override
+    public Optional<FunctionDefinitionFactory> getFunctionDefinitionFactory() {
+        return Optional.of(new TestFunctionDefinitionFactory());
+    }
+
     private Function<String, Comparable<?>> getValueGetter(
             Map<String, String> spec, TableSchema schema) {
         return field -> {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
index d4019fecaa0..2704d564305 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/catalog/CatalogTableITCase.scala
@@ -22,12 +22,14 @@ import org.apache.flink.table.api.config.{ExecutionConfigOptions, TableConfigOpt
 import org.apache.flink.table.api.internal.TableEnvironmentImpl
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.planner.expressions.utils.Func0
+import org.apache.flink.table.planner.factories.TestValuesCatalog
 import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory
 import org.apache.flink.table.planner.runtime.utils.JavaUserDefinedScalarFunctions.JavaFunc0
 import org.apache.flink.table.planner.utils.DateTimeTestUtil.localDateTime
+import org.apache.flink.table.utils.UserDefinedFunctions.{GENERATED_LOWER_UDF_CLASS, GENERATED_LOWER_UDF_CODE}
 import org.apache.flink.test.util.AbstractTestBase
 import org.apache.flink.types.Row
-import org.apache.flink.util.FileUtils
+import org.apache.flink.util.{FileUtils, UserClassLoaderJarTestUtils}
 
 import org.junit.{Before, Rule, Test}
 import org.junit.Assert.{assertEquals, assertNotEquals, fail}
@@ -39,8 +41,10 @@ import java.io.File
 import java.math.{BigDecimal => JBigDecimal}
 import java.net.URI
 import java.util
+import java.util.UUID
 
 import scala.collection.JavaConversions._
+import scala.util.Random
 
 /** Test cases for catalog table. */
 @RunWith(classOf[Parameterized])
@@ -1235,6 +1239,40 @@ class CatalogTableITCase(isStreamingMode: Boolean) extends AbstractTestBase {
     expectedProperty.put("k2", "b")
     assertEquals(expectedProperty, database.getProperties)
   }
+
+  @Test
+  def testLoadFunction(): Unit = {
+    tableEnv.registerCatalog("cat2", new TestValuesCatalog("cat2", "default", true))
+    tableEnv.executeSql("use catalog cat2")
+    // test load customer function packaged in a jar
+    val random = new Random();
+    val udfClassName = GENERATED_LOWER_UDF_CLASS + random.nextInt(50)
+    val jarPath = UserClassLoaderJarTestUtils
+      .createJarFile(
+        AbstractTestBase.TEMPORARY_FOLDER.newFolder(String.format("test-jar-%s", UUID.randomUUID)),
+        "test-classloader-udf.jar",
+        udfClassName,
+        String.format(GENERATED_LOWER_UDF_CODE, udfClassName)
+      )
+      .toURI
+      .toString
+    tableEnv.executeSql(s"""create function lowerUdf as '$udfClassName' using jar '$jarPath'""")
+
+    TestCollectionTableFactory.reset()
+    TestCollectionTableFactory.initData(List(Row.of("BoB")))
+    val ddl1 =
+      """
+        |create table t1(
+        |  a varchar
+        |) with (
+        |  'connector' = 'COLLECTION'
+        |)
+      """.stripMargin
+    tableEnv.executeSql(ddl1)
+    assertEquals(
+      "+I[bob]",
+      tableEnv.executeSql("select lowerUdf(a) from t1").collect().next().toString)
+  }
 }
 
 object CatalogTableITCase {