You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2022/07/20 09:36:04 UTC

[flink] branch master updated: [FLINK-27659][table-planner] Support to use jar that is registered by 'CREATE FUNCTION USING JAR' syntax

This is an automated email from the ASF dual-hosted git repository.

shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cfc10771635 [FLINK-27659][table-planner] Support to use jar that is registered by 'CREATE FUNCTION USING JAR' syntax
cfc10771635 is described below

commit cfc107716357c49b14ed59e0359e656764ea7f99
Author: fengli <ld...@163.com>
AuthorDate: Mon May 30 17:56:58 2022 +0800

    [FLINK-27659][table-planner] Support to use jar that is registered by 'CREATE FUNCTION USING JAR' syntax
    
    This closes #20001
---
 .../java/org/apache/flink/client/ClientUtils.java  |  18 +-
 .../flink/util/FlinkUserCodeClassLoaders.java      |  29 ++-
 .../flink/util/UserClassLoaderJarTestUtils.java    |  40 ++--
 flink-table/flink-sql-client/pom.xml               |   8 +
 .../client/gateway/context/ExecutionContext.java   |  31 +--
 .../client/gateway/context/SessionContext.java     | 155 ++++++---------
 .../client/resource/ClientResourceManager.java     |  51 +++++
 .../table/client/util/ClientClassloaderUtil.java   |  52 +++++
 .../client/util/ClientWrapperClassLoader.java      | 107 ++++++++++
 .../flink/table/client/cli/CliClientITCase.java    |  16 +-
 .../client/gateway/context/SessionContextTest.java |  19 +-
 .../table/client/gateway/local/DependencyTest.java |   7 +-
 .../client/gateway/local/LocalExecutorITCase.java  |  17 +-
 .../client/util/ClientWrapperClassLoaderTest.java  | 151 ++++++++++++++
 .../src/test/resources/sql/function.q              |  27 +++
 .../flink-sql-client/src/test/resources/sql/set.q  |   2 +-
 .../gateway/service/context/SessionContext.java    |  85 ++++----
 .../AbstractStreamTableEnvironmentImpl.java        |   7 +-
 .../java/internal/StreamTableEnvironmentImpl.java  |  29 +--
 .../internal/StreamTableEnvironmentImplTest.java   |  18 +-
 .../table/api/internal/TableEnvironmentImpl.java   |  62 ++++--
 .../flink/table/catalog/FunctionCatalog.java       |  50 +++--
 .../flink/table/resource/ResourceManager.java      | 217 ++++++++++++++-------
 .../flink/table/catalog/FunctionCatalogTest.java   |  12 +-
 .../flink/table/resource/ResourceManagerTest.java  | 181 +++++++++--------
 .../flink/table/utils/TableEnvironmentMock.java    |  26 ++-
 .../flink/table}/utils/UserDefinedFunctions.java   |  17 +-
 .../internal/StreamTableEnvironmentImpl.scala      |  30 +--
 .../internal/StreamTableEnvironmentImplTest.scala  |  17 +-
 .../batch/sql/CompactManagedTableITCase.java       |   2 +-
 .../batch/sql/ForwardHashExchangeITCase.java       |   2 +-
 .../planner/runtime/batch/sql/FunctionITCase.java  | 184 +++++++++++++++++
 .../sql/agg/LocalAggregatePushDownITCase.java      |   2 +-
 .../planner/runtime/stream/sql/FunctionITCase.java | 144 +++++++++++++-
 .../flink/table/planner/utils/PlannerMocks.java    |  22 ++-
 .../planner/plan/utils/RexNodeExtractorTest.scala  |  14 +-
 .../batch/sql/BatchFileSystemITCaseBase.scala      |   5 +-
 .../batch/sql/PartitionableSourceITCase.scala      |  63 +++---
 .../planner/runtime/utils/BatchTestBase.scala      |   1 +
 .../planner/runtime/utils/StreamingTestBase.scala  |   1 +
 .../flink/table/planner/utils/TableTestBase.scala  |  40 ++--
 41 files changed, 1439 insertions(+), 522 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 95a223a2135..bf6febda792 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -26,7 +26,6 @@ import org.apache.flink.client.program.StreamContextEnvironment;
 import org.apache.flink.client.program.rest.retry.ExponentialWaitStrategy;
 import org.apache.flink.client.program.rest.retry.WaitStrategy;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.runtime.client.JobInitializationException;
@@ -44,7 +43,6 @@ import java.net.URLClassLoader;
 import java.util.List;
 import java.util.Optional;
 
-import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Utility functions for Flink client. */
@@ -62,21 +60,7 @@ public enum ClientUtils {
         for (int i = 0; i < classpaths.size(); i++) {
             urls[i + jars.size()] = classpaths.get(i);
         }
-        final String[] alwaysParentFirstLoaderPatterns =
-                CoreOptions.getParentFirstLoaderPatterns(configuration);
-        final String classLoaderResolveOrder =
-                configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
-        FlinkUserCodeClassLoaders.ResolveOrder resolveOrder =
-                FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder);
-        final boolean checkClassloaderLeak =
-                configuration.getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER);
-        return FlinkUserCodeClassLoaders.create(
-                resolveOrder,
-                urls,
-                parent,
-                alwaysParentFirstLoaderPatterns,
-                NOOP_EXCEPTION_HANDLER,
-                checkClassloaderLeak);
+        return FlinkUserCodeClassLoaders.create(urls, parent, configuration);
     }
 
     public static void executeProgram(
diff --git a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
index 0a8f4afa896..39b726eaea0 100644
--- a/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
+++ b/flink-core/src/main/java/org/apache/flink/util/FlinkUserCodeClassLoaders.java
@@ -19,6 +19,7 @@
 package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 
 import org.slf4j.Logger;
@@ -29,6 +30,8 @@ import java.net.URL;
 import java.util.Enumeration;
 import java.util.function.Consumer;
 
+import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
+
 /** Gives the URLClassLoader a nicer name for debugging purposes. */
 @Internal
 public class FlinkUserCodeClassLoaders {
@@ -57,6 +60,25 @@ public class FlinkUserCodeClassLoaders {
         return wrapWithSafetyNet(classLoader, checkClassLoaderLeak);
     }
 
+    public static MutableURLClassLoader create(
+            final URL[] urls, final ClassLoader parent, final Configuration configuration) {
+        final String[] alwaysParentFirstLoaderPatterns =
+                CoreOptions.getParentFirstLoaderPatterns(configuration);
+        final String classLoaderResolveOrder =
+                configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
+        final FlinkUserCodeClassLoaders.ResolveOrder resolveOrder =
+                FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder);
+        final boolean checkClassloaderLeak =
+                configuration.getBoolean(CoreOptions.CHECK_LEAKED_CLASSLOADER);
+        return create(
+                resolveOrder,
+                urls,
+                parent,
+                alwaysParentFirstLoaderPatterns,
+                NOOP_EXCEPTION_HANDLER,
+                checkClassloaderLeak);
+    }
+
     public static MutableURLClassLoader create(
             ResolveOrder resolveOrder,
             URL[] urls,
@@ -129,13 +151,14 @@ public class FlinkUserCodeClassLoaders {
      * delegate is nulled and can be garbage collected. Additional class resolution will be resolved
      * solely through the bootstrap classloader and most likely result in ClassNotFound exceptions.
      */
-    private static class SafetyNetWrapperClassLoader extends MutableURLClassLoader {
+    @Internal
+    public static class SafetyNetWrapperClassLoader extends MutableURLClassLoader {
         private static final Logger LOG =
                 LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class);
 
-        private volatile FlinkUserCodeClassLoader inner;
+        protected volatile FlinkUserCodeClassLoader inner;
 
-        SafetyNetWrapperClassLoader(FlinkUserCodeClassLoader inner, ClassLoader parent) {
+        protected SafetyNetWrapperClassLoader(FlinkUserCodeClassLoader inner, ClassLoader parent) {
             super(new URL[0], parent);
             this.inner = inner;
         }
diff --git a/flink-core/src/test/java/org/apache/flink/util/UserClassLoaderJarTestUtils.java b/flink-core/src/test/java/org/apache/flink/util/UserClassLoaderJarTestUtils.java
index 42d3e3419db..46c61236dfe 100644
--- a/flink-core/src/test/java/org/apache/flink/util/UserClassLoaderJarTestUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/util/UserClassLoaderJarTestUtils.java
@@ -28,7 +28,10 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
 
@@ -43,11 +46,22 @@ public class UserClassLoaderJarTestUtils {
     /** Pack the generated class into a JAR and return the path of the JAR. */
     public static File createJarFile(File tmpDir, String jarName, String className, String javaCode)
             throws IOException {
-        // write class source code to file
-        File javaFile = Paths.get(tmpDir.toString(), className + ".java").toFile();
-        //noinspection ResultOfMethodCallIgnored
-        javaFile.createNewFile();
-        FileUtils.writeFileUtf8(javaFile, javaCode);
+        return createJarFile(tmpDir, jarName, Collections.singletonMap(className, javaCode));
+    }
+
+    /** Pack the generated classes into a JAR and return the path of the JAR. */
+    public static File createJarFile(
+            File tmpDir, String jarName, Map<String, String> classNameCodes) throws IOException {
+        List<File> javaFiles = new ArrayList<>();
+        for (Map.Entry<String, String> entry : classNameCodes.entrySet()) {
+            // write class source code to file
+            File javaFile = Paths.get(tmpDir.toString(), entry.getKey() + ".java").toFile();
+            //noinspection ResultOfMethodCallIgnored
+            javaFile.createNewFile();
+            FileUtils.writeFileUtf8(javaFile, entry.getValue());
+
+            javaFiles.add(javaFile);
+        }
 
         // compile class source code
         DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<>();
@@ -55,7 +69,7 @@ public class UserClassLoaderJarTestUtils {
         StandardJavaFileManager fileManager =
                 compiler.getStandardFileManager(diagnostics, null, null);
         Iterable<? extends JavaFileObject> compilationUnit =
-                fileManager.getJavaFileObjectsFromFiles(Collections.singletonList(javaFile));
+                fileManager.getJavaFileObjectsFromFiles(javaFiles);
         JavaCompiler.CompilationTask task =
                 compiler.getTask(
                         null,
@@ -67,14 +81,16 @@ public class UserClassLoaderJarTestUtils {
         task.call();
 
         // pack class file to jar
-        File classFile = Paths.get(tmpDir.toString(), className + ".class").toFile();
         File jarFile = Paths.get(tmpDir.toString(), jarName).toFile();
         JarOutputStream jos = new JarOutputStream(new FileOutputStream(jarFile));
-        JarEntry jarEntry = new JarEntry(className + ".class");
-        jos.putNextEntry(jarEntry);
-        byte[] classBytes = FileUtils.readAllBytes(classFile.toPath());
-        jos.write(classBytes);
-        jos.closeEntry();
+        for (String className : classNameCodes.keySet()) {
+            File classFile = Paths.get(tmpDir.toString(), className + ".class").toFile();
+            JarEntry jarEntry = new JarEntry(className + ".class");
+            jos.putNextEntry(jarEntry);
+            byte[] classBytes = FileUtils.readAllBytes(classFile.toPath());
+            jos.write(classBytes);
+            jos.closeEntry();
+        }
         jos.close();
 
         return jarFile;
diff --git a/flink-table/flink-sql-client/pom.xml b/flink-table/flink-sql-client/pom.xml
index f6c3a99f60f..23aaf0c987e 100644
--- a/flink-table/flink-sql-client/pom.xml
+++ b/flink-table/flink-sql-client/pom.xml
@@ -136,6 +136,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-table-api-java</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
index 7ca4a09a30a..bc662f56e91 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
@@ -34,10 +34,11 @@ import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.PlannerFactoryUtil;
 import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.util.MutableURLClassLoader;
 import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import java.lang.reflect.Method;
-import java.net.URLClassLoader;
 import java.util.function.Supplier;
 
 import static org.apache.flink.table.client.gateway.context.SessionContext.SessionState;
@@ -53,16 +54,17 @@ public class ExecutionContext {
     // Members that should be reused in the same session.
     private final Configuration flinkConfig;
     private final SessionState sessionState;
-    private final URLClassLoader classLoader;
+    private final MutableURLClassLoader classLoader;
 
     private final StreamTableEnvironment tableEnv;
 
     public ExecutionContext(
-            Configuration flinkConfig, URLClassLoader classLoader, SessionState sessionState) {
+            Configuration flinkConfig,
+            MutableURLClassLoader classLoader,
+            SessionState sessionState) {
         this.flinkConfig = flinkConfig;
         this.sessionState = sessionState;
         this.classLoader = classLoader;
-
         this.tableEnv = createTableEnvironment();
     }
 
@@ -108,28 +110,26 @@ public class ExecutionContext {
         StreamExecutionEnvironment streamExecEnv =
                 new StreamExecutionEnvironment(new Configuration(flinkConfig), classLoader);
 
-        final Executor executor = lookupExecutor(streamExecEnv);
+        final Executor executor = lookupExecutor(streamExecEnv, classLoader);
 
-        // Updates the classloader of FunctionCatalog by the new classloader to solve ClassNotFound
-        // exception when use an udf created by add jar syntax, temporary solution until FLINK-14055
-        // is fixed
-        sessionState.functionCatalog.updateClassLoader(classLoader);
         return createStreamTableEnvironment(
                 streamExecEnv,
                 settings,
                 executor,
                 sessionState.catalogManager,
                 sessionState.moduleManager,
+                sessionState.resourceManager,
                 sessionState.functionCatalog,
                 classLoader);
     }
 
-    private StreamTableEnvironment createStreamTableEnvironment(
+    private static StreamTableEnvironment createStreamTableEnvironment(
             StreamExecutionEnvironment env,
             EnvironmentSettings settings,
             Executor executor,
             CatalogManager catalogManager,
             ModuleManager moduleManager,
+            ResourceManager resourceManager,
             FunctionCatalog functionCatalog,
             ClassLoader userClassLoader) {
 
@@ -149,20 +149,23 @@ public class ExecutionContext {
         return new StreamTableEnvironmentImpl(
                 catalogManager,
                 moduleManager,
+                resourceManager,
                 functionCatalog,
                 tableConfig,
                 env,
                 planner,
                 executor,
-                settings.isStreamingMode(),
-                userClassLoader);
+                settings.isStreamingMode());
     }
 
-    private Executor lookupExecutor(StreamExecutionEnvironment executionEnvironment) {
+    private static Executor lookupExecutor(
+            StreamExecutionEnvironment executionEnvironment, ClassLoader userClassLoader) {
         try {
             final ExecutorFactory executorFactory =
                     FactoryUtil.discoverFactory(
-                            classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER);
+                            userClassLoader,
+                            ExecutorFactory.class,
+                            ExecutorFactory.DEFAULT_IDENTIFIER);
             final Method createMethod =
                     executorFactory
                             .getClass()
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
index ede67d5c259..161599c0cf2 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/SessionContext.java
@@ -19,12 +19,9 @@
 package org.apache.flink.table.client.gateway.context;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
-import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -35,7 +32,12 @@ import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.SqlExecutionException;
+import org.apache.flink.table.client.resource.ClientResourceManager;
+import org.apache.flink.table.client.util.ClientClassloaderUtil;
+import org.apache.flink.table.client.util.ClientWrapperClassLoader;
 import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.util.JarUtils;
 import org.apache.flink.util.TemporaryClassLoaderContext;
 
@@ -45,11 +47,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -71,17 +69,14 @@ public class SessionContext {
     private final Configuration sessionConfiguration;
 
     private final SessionState sessionState;
-    // SafetyNetWrapperClassLoader doesn't override the getURL therefore we need to maintain the
-    // dependencies by ourselves.
-    private Set<URL> dependencies;
-    private URLClassLoader classLoader;
+    private final ClientWrapperClassLoader classLoader;
     private ExecutionContext executionContext;
 
     private SessionContext(
             DefaultContext defaultContext,
             String sessionId,
             Configuration sessionConfiguration,
-            URLClassLoader classLoader,
+            ClientWrapperClassLoader classLoader,
             SessionState sessionState,
             ExecutionContext executionContext) {
         this.defaultContext = defaultContext;
@@ -90,7 +85,6 @@ public class SessionContext {
         this.classLoader = classLoader;
         this.sessionState = sessionState;
         this.executionContext = executionContext;
-        this.dependencies = new HashSet<>(defaultContext.getDependencies());
     }
 
     // --------------------------------------------------------------------------------------------
@@ -115,7 +109,7 @@ public class SessionContext {
 
     @VisibleForTesting
     Set<URL> getDependencies() {
-        return dependencies;
+        return sessionState.resourceManager.getLocalJarResources();
     }
 
     // --------------------------------------------------------------------------------------------
@@ -133,9 +127,6 @@ public class SessionContext {
         // If rebuild a new Configuration, it loses control of the SessionState if users wants to
         // modify the configuration
         resetSessionConfigurationToDefault(defaultContext.getFlinkConfig());
-        // Reset configuration will revert the `pipeline.jars`. To make the current classloader
-        // still work, add the maintained dependencies into the configuration.
-        updateClassLoaderAndDependencies(dependencies);
         executionContext = new ExecutionContext(sessionConfiguration, classLoader, sessionState);
     }
 
@@ -186,11 +177,7 @@ public class SessionContext {
                 sessionState.catalogManager.getCatalog(name).ifPresent(Catalog::close);
             }
         }
-        try {
-            classLoader.close();
-        } catch (IOException e) {
-            LOG.debug("Error while closing class loader.", e);
-        }
+        classLoader.close();
     }
 
     // --------------------------------------------------------------------------------------------
@@ -208,25 +195,30 @@ public class SessionContext {
         // Init classloader
         // --------------------------------------------------------------------------------------------------------------
 
-        URLClassLoader classLoader =
-                ClientUtils.buildUserCodeClassLoader(
-                        defaultContext.getDependencies(),
-                        Collections.emptyList(),
-                        SessionContext.class.getClassLoader(),
+        // here use ClientMutableURLClassLoader to support remove jar
+        final ClientWrapperClassLoader userClassLoader =
+                new ClientWrapperClassLoader(
+                        ClientClassloaderUtil.buildUserClassLoader(
+                                defaultContext.getDependencies(),
+                                SessionContext.class.getClassLoader(),
+                                new Configuration(configuration)),
                         configuration);
 
         // --------------------------------------------------------------------------------------------------------------
         // Init session state
         // --------------------------------------------------------------------------------------------------------------
 
-        ModuleManager moduleManager = new ModuleManager();
+        final ClientResourceManager resourceManager =
+                new ClientResourceManager(configuration, userClassLoader);
+
+        final ModuleManager moduleManager = new ModuleManager();
 
         final EnvironmentSettings settings =
                 EnvironmentSettings.newInstance().withConfiguration(configuration).build();
 
-        CatalogManager catalogManager =
+        final CatalogManager catalogManager =
                 CatalogManager.newBuilder()
-                        .classLoader(classLoader)
+                        .classLoader(userClassLoader)
                         .config(configuration)
                         .defaultCatalog(
                                 settings.getBuiltInCatalogName(),
@@ -235,69 +227,63 @@ public class SessionContext {
                                         settings.getBuiltInDatabaseName()))
                         .build();
 
-        FunctionCatalog functionCatalog =
-                new FunctionCatalog(configuration, catalogManager, moduleManager, classLoader);
-        SessionState sessionState =
-                new SessionState(catalogManager, moduleManager, functionCatalog);
+        final FunctionCatalog functionCatalog =
+                new FunctionCatalog(configuration, resourceManager, catalogManager, moduleManager);
+        final SessionState sessionState =
+                new SessionState(catalogManager, moduleManager, resourceManager, functionCatalog);
 
         // --------------------------------------------------------------------------------------------------------------
         // Init ExecutionContext
         // --------------------------------------------------------------------------------------------------------------
 
         ExecutionContext executionContext =
-                new ExecutionContext(configuration, classLoader, sessionState);
+                new ExecutionContext(configuration, userClassLoader, sessionState);
 
         return new SessionContext(
                 defaultContext,
                 sessionId,
                 configuration,
-                classLoader,
+                userClassLoader,
                 sessionState,
                 executionContext);
     }
 
     public void addJar(String jarPath) {
-        URL jarURL = getURLFromPath(jarPath, "SQL Client only supports to add local jars.");
-        if (dependencies.contains(jarURL)) {
-            return;
+        checkJarPath(jarPath, "SQL Client only supports to add local jars.");
+        try {
+            sessionState.resourceManager.registerJarResources(
+                    Collections.singletonList(new ResourceUri(ResourceType.JAR, jarPath)));
+        } catch (Exception e) {
+            LOG.warn(String.format("Could not register the specified jar [%s].", jarPath), e);
         }
-
-        // merge the jars in config with the jars maintained in session
-        Set<URL> jarsInConfig = getJarsInConfig();
-
-        Set<URL> newDependencies = new HashSet<>(dependencies);
-        newDependencies.addAll(jarsInConfig);
-        newDependencies.add(jarURL);
-        updateClassLoaderAndDependencies(newDependencies);
-
-        // renew the execution context
-        executionContext = new ExecutionContext(sessionConfiguration, classLoader, sessionState);
     }
 
     public void removeJar(String jarPath) {
-        URL jarURL = getURLFromPath(jarPath, "SQL Client only supports to remove local jars.");
-        if (!dependencies.contains(jarURL)) {
+        // if is relative path, convert to absolute path
+        URL jarURL = checkJarPath(jarPath, "SQL Client only supports to remove local jars.");
+        // remove jar from resource manager
+        jarURL = sessionState.resourceManager.unregisterJarResource(jarURL.getPath());
+        if (jarURL == null) {
             LOG.warn(
                     String.format(
-                            "Could not remove the specified jar because the jar path(%s) is not found in session classloader.",
+                            "Could not remove the specified jar because the jar path [%s] hadn't registered to classloader.",
                             jarPath));
             return;
         }
-
-        Set<URL> newDependencies = new HashSet<>(dependencies);
-        // merge the jars in config with the jars maintained in session
-        Set<URL> jarsInConfig = getJarsInConfig();
-        newDependencies.addAll(jarsInConfig);
-        newDependencies.remove(jarURL);
-
-        updateClassLoaderAndDependencies(newDependencies);
-
-        // renew the execution context
-        executionContext = new ExecutionContext(sessionConfiguration, classLoader, sessionState);
+        // remove jar from classloader
+        classLoader.removeURL(jarURL);
     }
 
     public List<String> listJars() {
-        return dependencies.stream().map(URL::getPath).collect(Collectors.toList());
+        List<String> jars =
+                sessionState.resourceManager.getResources().keySet().stream()
+                        .filter(
+                                resourceUri ->
+                                        ResourceType.JAR.equals(resourceUri.getResourceType()))
+                        .map(ResourceUri::getUri)
+                        .collect(Collectors.toList());
+        LOG.info(String.format("Registered jars: %s", jars));
+        return jars;
     }
 
     // --------------------------------------------------------------------------------------------
@@ -308,15 +294,18 @@ public class SessionContext {
     public static class SessionState {
 
         public final CatalogManager catalogManager;
-        public final FunctionCatalog functionCatalog;
         public final ModuleManager moduleManager;
+        public final ClientResourceManager resourceManager;
+        public final FunctionCatalog functionCatalog;
 
         public SessionState(
                 CatalogManager catalogManager,
                 ModuleManager moduleManager,
+                ClientResourceManager resourceManager,
                 FunctionCatalog functionCatalog) {
             this.catalogManager = catalogManager;
             this.moduleManager = moduleManager;
+            this.resourceManager = resourceManager;
             this.functionCatalog = functionCatalog;
         }
     }
@@ -332,25 +321,7 @@ public class SessionContext {
         sessionConfiguration.addAll(defaultConf);
     }
 
-    private void updateClassLoaderAndDependencies(Collection<URL> newDependencies) {
-        // replace jars with the new dependencies
-        ConfigUtils.encodeCollectionToConfig(
-                sessionConfiguration,
-                PipelineOptions.JARS,
-                new ArrayList<>(newDependencies),
-                URL::toString);
-
-        // TODO: update the classloader in CatalogManager.
-        classLoader =
-                ClientUtils.buildUserCodeClassLoader(
-                        new ArrayList<>(newDependencies),
-                        Collections.emptyList(),
-                        SessionContext.class.getClassLoader(),
-                        sessionConfiguration);
-        dependencies = new HashSet<>(newDependencies);
-    }
-
-    private URL getURLFromPath(String jarPath, String message) {
+    private URL checkJarPath(String jarPath, String message) {
         Path path = new Path(jarPath);
         String scheme = path.toUri().getScheme();
         if (scheme != null && !scheme.equals("file")) {
@@ -372,18 +343,4 @@ public class SessionContext {
                     e);
         }
     }
-
-    private Set<URL> getJarsInConfig() {
-        Set<URL> jarsInConfig;
-        try {
-            jarsInConfig =
-                    new HashSet<>(
-                            ConfigUtils.decodeListFromConfig(
-                                    sessionConfiguration, PipelineOptions.JARS, URL::new));
-        } catch (MalformedURLException e) {
-            throw new SqlExecutionException(
-                    "Failed to parse the option `pipeline.jars` in configuration.", e);
-        }
-        return jarsInConfig;
-    }
 }
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java
new file mode 100644
index 00000000000..00d1e780d02
--- /dev/null
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/resource/ClientResourceManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.resource;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.table.resource.ResourceType;
+import org.apache.flink.table.resource.ResourceUri;
+import org.apache.flink.util.MutableURLClassLoader;
+
+import javax.annotation.Nullable;
+
+import java.net.URL;
+
+/**
+ * The {@link ClientResourceManager} is able to remove the registered JAR resources with the
+ * specified jar path.
+ *
+ * <p>After removing the JAR resource, the {@link ResourceManager} is able to register the JAR
+ * resource with the same JAR path. Please notice that the removal doesn't promise the loaded {@link
+ * Class} from the removed jar is inaccessible.
+ */
+@Internal
+public class ClientResourceManager extends ResourceManager {
+
+    public ClientResourceManager(Configuration config, MutableURLClassLoader userClassLoader) {
+        super(config, userClassLoader);
+    }
+
+    @Nullable
+    public URL unregisterJarResource(String jarPath) {
+        return resourceInfos.remove(new ResourceUri(ResourceType.JAR, jarPath));
+    }
+}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/util/ClientClassloaderUtil.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/util/ClientClassloaderUtil.java
new file mode 100644
index 00000000000..84b57a604ed
--- /dev/null
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/util/ClientClassloaderUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.util;
+
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.util.FlinkUserCodeClassLoader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+
+/** Utilities for {@link ClientWrapperClassLoader}. */
+public class ClientClassloaderUtil {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClientClassloaderUtil.class);
+
+    public static FlinkUserCodeClassLoader buildUserClassLoader(
+            List<URL> jarUrls, ClassLoader parentClassLoader, Configuration conf) {
+        LOG.debug(
+                String.format(
+                        "Set option '%s' to 'false' to use %s.",
+                        CoreOptions.CHECK_LEAKED_CLASSLOADER.key(),
+                        ClientWrapperClassLoader.class.getSimpleName()));
+        conf.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
+        return (FlinkUserCodeClassLoader)
+                ClientUtils.buildUserCodeClassLoader(
+                        jarUrls, Collections.emptyList(), parentClassLoader, conf);
+    }
+
+    private ClientClassloaderUtil() {}
+}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/util/ClientWrapperClassLoader.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/util/ClientWrapperClassLoader.java
new file mode 100644
index 00000000000..0cfcb939f14
--- /dev/null
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/util/ClientWrapperClassLoader.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.util;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkUserCodeClassLoader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.FlinkUserCodeClassLoaders.SafetyNetWrapperClassLoader;
+
+/**
+ * This class loader extends {@link SafetyNetWrapperClassLoader}, upon the {@code addURL} method, it
+ * also exposes a {@code removeURL} method which used to remove unnecessary jar from current
+ * classloader path. This class loader wraps a {@link FlinkUserCodeClassLoader} and an old
+ * classloader list, the class load is delegated to the inner {@link FlinkUserCodeClassLoader}.
+ *
+ * <p>This is only used to SqlClient for supporting {@code REMOVE JAR} clause currently. When remove
+ * a jar, get the registered jar url list from current {@link FlinkUserCodeClassLoader} firstly,
+ * then create a new instance of {@link FlinkUserCodeClassLoader} which urls doesn't include the
+ * removed jar, and the currentClassLoader point to new instance object, the old object is added to
+ * list to be closed when close {@link ClientWrapperClassLoader}.
+ *
+ * <p>Note: This classloader is not guaranteed to actually remove class or resource, any classes or
+ * resources in the removed jar that are already loaded, are still accessible.
+ */
+@Experimental
+@Internal
+public class ClientWrapperClassLoader extends SafetyNetWrapperClassLoader {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ClientWrapperClassLoader.class);
+
+    static {
+        ClassLoader.registerAsParallelCapable();
+    }
+
+    private final Configuration configuration;
+    private final List<FlinkUserCodeClassLoader> originClassLoaders;
+
+    public ClientWrapperClassLoader(FlinkUserCodeClassLoader inner, Configuration configuration) {
+        super(inner, inner.getParent());
+        this.configuration = new Configuration(configuration);
+        this.originClassLoaders = new ArrayList<>();
+    }
+
+    public void removeURL(URL url) {
+        Set<URL> registeredUrls = Stream.of(inner.getURLs()).collect(Collectors.toSet());
+        if (!registeredUrls.contains(url)) {
+            LOG.warn(
+                    String.format(
+                            "Could not remove the specified jar because the jar path [%s] is not found in classloader.",
+                            url));
+            return;
+        }
+
+        originClassLoaders.add(inner);
+        // build a new classloader without removed jars
+        registeredUrls.remove(url);
+        inner =
+                ClientClassloaderUtil.buildUserClassLoader(
+                        new ArrayList<>(registeredUrls),
+                        ClientWrapperClassLoader.class.getClassLoader(),
+                        configuration);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        // close other classloader in the list
+        for (FlinkUserCodeClassLoader classLoader : originClassLoaders) {
+            try {
+                classLoader.close();
+            } catch (IOException e) {
+                LOG.error("Failed to close the origin classloader.", e);
+            }
+        }
+
+        originClassLoaders.clear();
+    }
+}
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
index 45ee390095f..e31ee990058 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.table.client.cli.utils.TestSqlStatement;
 import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.context.DefaultContext;
 import org.apache.flink.table.client.gateway.local.LocalExecutor;
-import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
 import org.apache.flink.table.planner.utils.TableTestUtil;
 import org.apache.flink.test.util.AbstractTestBase;
 import org.apache.flink.util.UserClassLoaderJarTestUtils;
@@ -69,6 +68,10 @@ import java.util.stream.Collectors;
 
 import static org.apache.flink.configuration.JobManagerOptions.ADDRESS;
 import static org.apache.flink.configuration.RestOptions.PORT;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CODE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test that runs every {@code xx.q} file in "resources/sql/" path as a test. */
@@ -99,12 +102,19 @@ public class CliClientITCase extends AbstractTestBase {
 
     @BeforeClass
     public static void setup() throws IOException {
+        Map<String, String> classNameCodes = new HashMap<>();
+        classNameCodes.put(
+                GENERATED_LOWER_UDF_CLASS,
+                String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS));
+        classNameCodes.put(
+                GENERATED_UPPER_UDF_CLASS,
+                String.format(GENERATED_UPPER_UDF_CODE, GENERATED_UPPER_UDF_CLASS));
+
         File udfJar =
                 UserClassLoaderJarTestUtils.createJarFile(
                         tempFolder.newFolder("test-jar"),
                         "test-classloader-udf.jar",
-                        UserDefinedFunctions.GENERATED_UDF_CLASS,
-                        UserDefinedFunctions.GENERATED_UDF_CODE);
+                        classNameCodes);
         URL udfDependency = udfJar.toURI().toURL();
         historyPath = tempFolder.newFile("history").toPath();
 
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
index 7f9f1574a85..3140aed5dec 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/context/SessionContextTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.client.gateway.context;
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
 import org.apache.flink.util.UserClassLoaderJarTestUtils;
 
 import org.junit.Before;
@@ -43,6 +42,8 @@ import static org.apache.flink.configuration.PipelineOptions.NAME;
 import static org.apache.flink.configuration.PipelineOptions.OBJECT_REUSE;
 import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
 import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_SQL_DIALECT;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 import static org.assertj.core.api.HamcrestCondition.matching;
@@ -62,8 +63,8 @@ public class SessionContextTest {
                 UserClassLoaderJarTestUtils.createJarFile(
                         tempFolder.newFolder("test-jar"),
                         "test-classloader-udf.jar",
-                        UserDefinedFunctions.GENERATED_UDF_CLASS,
-                        UserDefinedFunctions.GENERATED_UDF_CODE);
+                        GENERATED_LOWER_UDF_CLASS,
+                        String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS));
     }
 
     @Before
@@ -164,14 +165,6 @@ public class SessionContextTest {
                 "hdfs://remote:10080/remote.jar", "SQL Client only supports to add local jars.");
     }
 
-    @Test
-    public void testAddIllegalJarInConfig() {
-        Configuration innerConfig = (Configuration) sessionContext.getReadableConfig();
-        innerConfig.set(JARS, Collections.singletonList("/path/to/illegal.jar"));
-
-        validateAddJarWithException(udfJar.getPath(), "no protocol: /path/to/illegal.jar");
-    }
-
     @Test
     public void testRemoveJarWithFullPath() {
         validateRemoveJar(udfJar.getPath());
@@ -224,14 +217,12 @@ public class SessionContextTest {
         return sessionContext.getExecutionContext().getTableEnvironment().getConfig();
     }
 
-    private void validateAddJar(String jarPath) throws IOException {
+    private void validateAddJar(String jarPath) {
         sessionContext.addJar(jarPath);
         assertThat(sessionContext.listJars()).containsExactly(udfJar.getPath());
-        assertThat(getConfiguration().get(JARS)).containsExactly(udfJar.toURI().toURL().toString());
         // reset to the default
         sessionContext.reset();
         assertThat(sessionContext.listJars()).containsExactly(udfJar.getPath());
-        assertThat(getConfiguration().get(JARS)).containsExactly(udfJar.toURI().toURL().toString());
     }
 
     private void validateRemoveJar(String jarPath) {
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 6a4dc267612..2de67ff74fc 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -57,8 +57,6 @@ import org.apache.flink.util.CollectionUtil;
 
 import org.junit.Test;
 
-import java.net.URL;
-import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -80,7 +78,6 @@ public class DependencyTest {
 
     public static final String CATALOG_TYPE_TEST = "DependencyTest";
 
-    private static final String TABLE_FACTORY_JAR_FILE = "table-factories-test-jar.jar";
     private static final List<String> INIT_SQL =
             Arrays.asList(
                     String.format(
@@ -151,12 +148,10 @@ public class DependencyTest {
     }
 
     private LocalExecutor createLocalExecutor() throws Exception {
-        // create executor with dependencies
-        final URL dependency = Paths.get("target", TABLE_FACTORY_JAR_FILE).toUri().toURL();
         // create default context
         DefaultContext defaultContext =
                 new DefaultContext(
-                        Collections.singletonList(dependency),
+                        Collections.emptyList(),
                         new Configuration(),
                         Collections.singletonList(new DefaultCLI()));
         LocalExecutor executor = new LocalExecutor(defaultContext);
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index df1ba55cfe5..7c7a5062136 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -34,13 +34,12 @@ import org.apache.flink.table.client.gateway.Executor;
 import org.apache.flink.table.client.gateway.ResultDescriptor;
 import org.apache.flink.table.client.gateway.TypedResult;
 import org.apache.flink.table.client.gateway.context.DefaultContext;
-import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions;
-import org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.TableUDF;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.utils.UserDefinedFunctions;
 import org.apache.flink.table.utils.print.RowDataToStringConverter;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestBaseUtils;
@@ -71,7 +70,8 @@ import java.util.stream.Stream;
 import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
 import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_MAX_TABLE_RESULT_ROWS;
 import static org.apache.flink.table.client.config.SqlClientOptions.EXECUTION_RESULT_MODE;
-import static org.apache.flink.table.client.gateway.utils.UserDefinedFunctions.ScalarUDF;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -104,8 +104,8 @@ public class LocalExecutorITCase extends TestLogger {
                 UserClassLoaderJarTestUtils.createJarFile(
                         tempFolder.newFolder("test-jar"),
                         "test-classloader-udf.jar",
-                        UserDefinedFunctions.GENERATED_UDF_CLASS,
-                        UserDefinedFunctions.GENERATED_UDF_CODE);
+                        GENERATED_LOWER_UDF_CLASS,
+                        String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS));
         udfDependency = udfJar.toURI().toURL();
     }
 
@@ -565,11 +565,14 @@ public class LocalExecutorITCase extends TestLogger {
     private List<String> getInitSQL(final Map<String, String> replaceVars) {
         return Stream.of(
                         String.format(
-                                "CREATE FUNCTION scalarUDF AS '%s'", ScalarUDF.class.getName()),
+                                "CREATE FUNCTION scalarUDF AS '%s'",
+                                UserDefinedFunctions.ScalarUDF.class.getName()),
                         String.format(
                                 "CREATE FUNCTION aggregateUDF AS '%s'",
                                 AggregateFunction.class.getName()),
-                        String.format("CREATE FUNCTION tableUDF AS '%s'", TableUDF.class.getName()),
+                        String.format(
+                                "CREATE FUNCTION tableUDF AS '%s'",
+                                UserDefinedFunctions.TableUDF.class.getName()),
                         "CREATE TABLE TableNumber1 (\n"
                                 + "  IntegerField1 INT,\n"
                                 + "  StringField1 STRING,\n"
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/util/ClientWrapperClassLoaderTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/util/ClientWrapperClassLoaderTest.java
new file mode 100644
index 00000000000..941ceaa1388
--- /dev/null
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/util/ClientWrapperClassLoaderTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.util;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.net.URL;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_UPPER_UDF_CODE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/** Tests for classloading and class loader utilities. */
+public class ClientWrapperClassLoaderTest {
+
+    private static File userJar;
+
+    @BeforeAll
+    public static void prepare(@TempDir File tempDir) throws Exception {
+        Map<String, String> classNameCodes = new HashMap<>();
+        classNameCodes.put(
+                GENERATED_LOWER_UDF_CLASS,
+                String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS));
+        classNameCodes.put(
+                GENERATED_UPPER_UDF_CLASS,
+                String.format(GENERATED_UPPER_UDF_CODE, GENERATED_UPPER_UDF_CLASS));
+        userJar =
+                UserClassLoaderJarTestUtils.createJarFile(
+                        tempDir, "test-classloader.jar", classNameCodes);
+    }
+
+    @Test
+    public void testClassLoadingByAddURL() throws Exception {
+        Configuration configuration = new Configuration();
+        final ClientWrapperClassLoader classLoader =
+                new ClientWrapperClassLoader(
+                        ClientClassloaderUtil.buildUserClassLoader(
+                                Collections.emptyList(),
+                                getClass().getClassLoader(),
+                                configuration),
+                        configuration);
+
+        // test class loader before add jar url to ClassLoader
+        assertClassNotFoundException(GENERATED_LOWER_UDF_CLASS, classLoader);
+
+        // add jar url to ClassLoader
+        classLoader.addURL(userJar.toURI().toURL());
+
+        assertEquals(1, classLoader.getURLs().length);
+
+        final Class<?> clazz1 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, classLoader);
+        final Class<?> clazz2 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, classLoader);
+
+        assertEquals(clazz1, clazz2);
+
+        classLoader.close();
+    }
+
+    @Test
+    public void testClassLoadingByRemoveURL() throws Exception {
+        URL jarURL = userJar.toURI().toURL();
+        Configuration configuration = new Configuration();
+        final ClientWrapperClassLoader classLoader =
+                new ClientWrapperClassLoader(
+                        ClientClassloaderUtil.buildUserClassLoader(
+                                Collections.singletonList(jarURL),
+                                getClass().getClassLoader(),
+                                configuration),
+                        configuration);
+
+        final Class<?> clazz1 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, classLoader);
+        final Class<?> clazz2 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, classLoader);
+        assertEquals(clazz1, clazz2);
+
+        // remove jar url
+        classLoader.removeURL(jarURL);
+
+        assertEquals(0, classLoader.getURLs().length);
+
+        // test class loader after remove jar url from ClassLoader
+        assertClassNotFoundException(GENERATED_UPPER_UDF_CLASS, classLoader);
+
+        // add jar url to ClassLoader again
+        classLoader.addURL(jarURL);
+
+        assertEquals(1, classLoader.getURLs().length);
+
+        final Class<?> clazz3 = Class.forName(GENERATED_UPPER_UDF_CLASS, false, classLoader);
+        final Class<?> clazz4 = Class.forName(GENERATED_UPPER_UDF_CLASS, false, classLoader);
+        assertEquals(clazz3, clazz4);
+
+        classLoader.close();
+    }
+
+    @Test
+    public void testParallelCapable() {
+        // It will be true only if all the super classes (except class Object) of the caller are
+        // registered as parallel capable.
+        assertTrue(TestClientWrapperClassLoader.IS_PARALLEL_CAPABLE);
+    }
+
+    private void assertClassNotFoundException(String className, ClassLoader classLoader) {
+        CommonTestUtils.assertThrows(
+                className,
+                ClassNotFoundException.class,
+                () -> Class.forName(className, false, classLoader));
+    }
+
+    private static class TestClientWrapperClassLoader extends ClientWrapperClassLoader {
+
+        public static final boolean IS_PARALLEL_CAPABLE = ClassLoader.registerAsParallelCapable();
+
+        TestClientWrapperClassLoader() {
+            super(
+                    ClientClassloaderUtil.buildUserClassLoader(
+                            Collections.emptyList(),
+                            TestClientWrapperClassLoader.class.getClassLoader(),
+                            new Configuration()),
+                    new Configuration());
+        }
+    }
+}
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/function.q b/flink-table/flink-sql-client/src/test/resources/sql/function.q
index 924c96f9b76..705bb45351d 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/function.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/function.q
@@ -209,6 +209,33 @@ org.apache.flink.table.api.ValidationException: Alter temporary catalog function
 !error
 
 
+# ==========================================================================
+# test create function using jar
+# ==========================================================================
+
+REMOVE JAR '$VAR_UDF_JAR_PATH';
+[INFO] The specified jar is removed from session classloader.
+!info
+
+create function upperudf AS 'UpperUDF' using jar '$VAR_UDF_JAR_PATH';
+[INFO] Execute statement succeed.
+!info
+
+# run a query to verify the registered UDF works
+SELECT id, upperudf(str) FROM (VALUES (1, 'hello world'), (2, 'hi')) as T(id, str);
++----+-------------+--------------------------------+
+| op |          id |                         EXPR$1 |
++----+-------------+--------------------------------+
+| +I |           1 |                    HELLO WORLD |
+| +I |           2 |                             HI |
++----+-------------+--------------------------------+
+Received a total of 2 rows
+!ok
+
+SHOW JARS;
+$VAR_UDF_JAR_PATH
+!ok
+
 # ==========================================================================
 # test function with hive catalog
 # ==========================================================================
diff --git a/flink-table/flink-sql-client/src/test/resources/sql/set.q b/flink-table/flink-sql-client/src/test/resources/sql/set.q
index a2d01e005f8..06edfa63997 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/set.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/set.q
@@ -173,7 +173,7 @@ set;
 'execution.target' = 'remote'
 'jobmanager.rpc.address' = 'localhost'
 'pipeline.classpaths' = ''
-'pipeline.jars' = '$VAR_PIPELINE_JARS_URL'
+'pipeline.jars' = ''
 'rest.port' = '$VAR_REST_PORT'
 'sql-client.verbose' = 'true'
 'table.exec.legacy-cast-behaviour' = 'DISABLED'
diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
index 15ad5a7050f..ba2298548e5 100644
--- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
+++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java
@@ -18,16 +18,15 @@
 
 package org.apache.flink.table.gateway.service.context;
 
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.api.config.TableConfigOptions;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
@@ -44,6 +43,9 @@ import org.apache.flink.table.gateway.service.operation.OperationExecutor;
 import org.apache.flink.table.gateway.service.operation.OperationManager;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.resource.ResourceManager;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
+import org.apache.flink.util.MutableURLClassLoader;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,11 +54,10 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.URL;
 import java.net.URLClassLoader;
-import java.util.ArrayList;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -177,26 +178,31 @@ public class SessionContext {
 
         Configuration configuration = defaultContext.getFlinkConfig().clone();
         configuration.addAll(sessionConf);
+        // every session configure the specific local resource download directory
+        setResourceDownloadTmpDir(sessionConf, sessionId);
 
         // --------------------------------------------------------------------------------------------------------------
         // Init classloader
         // --------------------------------------------------------------------------------------------------------------
 
-        URLClassLoader classLoader =
-                buildClassLoader(Collections.emptySet(), Collections.emptySet(), configuration);
+        final MutableURLClassLoader userClassLoader =
+                FlinkUserCodeClassLoaders.create(
+                        new URL[0], SessionContext.class.getClassLoader(), configuration);
 
         // --------------------------------------------------------------------------------------------------------------
         // Init session state
         // --------------------------------------------------------------------------------------------------------------
 
-        ModuleManager moduleManager = new ModuleManager();
+        final ResourceManager resourceManager = new ResourceManager(configuration, userClassLoader);
+
+        final ModuleManager moduleManager = new ModuleManager();
 
         final EnvironmentSettings settings = EnvironmentSettings.fromConfiguration(configuration);
 
         CatalogManager catalogManager =
                 CatalogManager.newBuilder()
                         // Currently, the classloader is only used by DataTypeFactory.
-                        .classLoader(classLoader)
+                        .classLoader(userClassLoader)
                         .config(configuration)
                         .defaultCatalog(
                                 settings.getBuiltInCatalogName(),
@@ -205,37 +211,21 @@ public class SessionContext {
                                         settings.getBuiltInDatabaseName()))
                         .build();
 
-        FunctionCatalog functionCatalog =
-                new FunctionCatalog(configuration, catalogManager, moduleManager, classLoader);
+        final FunctionCatalog functionCatalog =
+                new FunctionCatalog(configuration, resourceManager, catalogManager, moduleManager);
         SessionState sessionState =
-                new SessionState(catalogManager, moduleManager, functionCatalog);
+                new SessionState(catalogManager, moduleManager, resourceManager, functionCatalog);
 
         return new SessionContext(
                 defaultContext,
                 sessionId,
                 endpointVersion,
                 configuration,
-                classLoader,
+                userClassLoader,
                 sessionState,
                 new OperationManager(operationExecutorService));
     }
 
-    private static URLClassLoader buildClassLoader(
-            Set<URL> envDependencies, Set<URL> userDependencies, Configuration conf) {
-        Set<URL> newDependencies = new HashSet<>();
-        newDependencies.addAll(envDependencies);
-        newDependencies.addAll(userDependencies);
-
-        // override to use SafetyNetWrapperClassLoader
-        conf.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, true);
-
-        return ClientUtils.buildUserCodeClassLoader(
-                new ArrayList<>(newDependencies),
-                Collections.emptyList(),
-                SessionContext.class.getClassLoader(),
-                conf);
-    }
-
     // ------------------------------------------------------------------------------------------------------------------
     // Helpers
     // ------------------------------------------------------------------------------------------------------------------
@@ -251,7 +241,7 @@ public class SessionContext {
         tableConfig.setRootConfiguration(defaultContext.getFlinkConfig());
         tableConfig.addConfiguration(sessionConf);
 
-        final Executor executor = lookupExecutor(streamExecEnv);
+        final Executor executor = lookupExecutor(streamExecEnv, userClassloader);
         return createStreamTableEnvironment(
                 streamExecEnv,
                 settings,
@@ -259,29 +249,29 @@ public class SessionContext {
                 executor,
                 sessionState.catalogManager,
                 sessionState.moduleManager,
-                sessionState.functionCatalog,
-                userClassloader);
+                sessionState.resourceManager,
+                sessionState.functionCatalog);
     }
 
     public OperationManager getOperationManager() {
         return operationManager;
     }
 
-    private TableEnvironmentInternal createStreamTableEnvironment(
+    private static TableEnvironmentInternal createStreamTableEnvironment(
             StreamExecutionEnvironment env,
             EnvironmentSettings settings,
             TableConfig tableConfig,
             Executor executor,
             CatalogManager catalogManager,
             ModuleManager moduleManager,
-            FunctionCatalog functionCatalog,
-            ClassLoader userClassLoader) {
+            ResourceManager resourceManager,
+            FunctionCatalog functionCatalog) {
 
         final Planner planner =
                 PlannerFactoryUtil.createPlanner(
                         executor,
                         tableConfig,
-                        userClassLoader,
+                        resourceManager.getUserClassLoader(),
                         moduleManager,
                         catalogManager,
                         functionCatalog);
@@ -289,20 +279,21 @@ public class SessionContext {
         return new StreamTableEnvironmentImpl(
                 catalogManager,
                 moduleManager,
+                resourceManager,
                 functionCatalog,
                 tableConfig,
                 env,
                 planner,
                 executor,
-                settings.isStreamingMode(),
-                userClassLoader);
+                settings.isStreamingMode());
     }
 
-    private Executor lookupExecutor(StreamExecutionEnvironment executionEnvironment) {
+    private static Executor lookupExecutor(
+            StreamExecutionEnvironment executionEnvironment, ClassLoader userClassLoader) {
         try {
             final ExecutorFactory executorFactory =
                     FactoryUtil.discoverFactory(
-                            userClassloader,
+                            userClassLoader,
                             ExecutorFactory.class,
                             ExecutorFactory.DEFAULT_IDENTIFIER);
             final Method createMethod =
@@ -326,6 +317,17 @@ public class SessionContext {
         return new StreamExecutionEnvironment(new Configuration(sessionConf), userClassloader);
     }
 
+    private static void setResourceDownloadTmpDir(
+            Configuration configuration, SessionHandle sessionId) {
+        Path path =
+                Paths.get(
+                        configuration.get(TableConfigOptions.RESOURCE_DOWNLOAD_DIR),
+                        String.format("sql-gateway-%s", sessionId));
+        // override resource download temp directory
+        configuration.set(
+                TableConfigOptions.RESOURCE_DOWNLOAD_DIR, path.toAbsolutePath().toString());
+    }
+
     // --------------------------------------------------------------------------------------------
     // Inner class
     // --------------------------------------------------------------------------------------------
@@ -334,15 +336,18 @@ public class SessionContext {
     public static class SessionState {
 
         public final CatalogManager catalogManager;
+        public final ResourceManager resourceManager;
         public final FunctionCatalog functionCatalog;
         public final ModuleManager moduleManager;
 
         public SessionState(
                 CatalogManager catalogManager,
                 ModuleManager moduleManager,
+                ResourceManager resourceManager,
                 FunctionCatalog functionCatalog) {
             this.catalogManager = catalogManager;
             this.moduleManager = moduleManager;
+            this.resourceManager = resourceManager;
             this.functionCatalog = functionCatalog;
         }
     }
diff --git a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
index 16c52b77be2..06c58c5e473 100644
--- a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
@@ -57,6 +57,7 @@ import org.apache.flink.table.operations.ExternalQueryOperation;
 import org.apache.flink.table.operations.ModifyOperation;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.utils.OperationTreeBuilder;
+import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.table.typeutils.FieldInfoUtils;
@@ -78,22 +79,22 @@ public abstract class AbstractStreamTableEnvironmentImpl extends TableEnvironmen
     public AbstractStreamTableEnvironmentImpl(
             CatalogManager catalogManager,
             ModuleManager moduleManager,
+            ResourceManager resourceManager,
             TableConfig tableConfig,
             Executor executor,
             FunctionCatalog functionCatalog,
             Planner planner,
             boolean isStreamingMode,
-            ClassLoader userClassLoader,
             StreamExecutionEnvironment executionEnvironment) {
         super(
                 catalogManager,
                 moduleManager,
+                resourceManager,
                 tableConfig,
                 executor,
                 functionCatalog,
                 planner,
-                isStreamingMode,
-                userClassLoader);
+                isStreamingMode);
         this.executionEnvironment = executionEnvironment;
     }
 
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
index cd8e4c4c622..716bfe806b1 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
@@ -46,14 +46,18 @@ import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.OutputConversionModifyOperation;
+import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceValidation;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.TypeConversions;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
+import org.apache.flink.util.MutableURLClassLoader;
 import org.apache.flink.util.Preconditions;
 
+import java.net.URL;
 import java.util.Arrays;
 import java.util.Optional;
 
@@ -70,40 +74,43 @@ public final class StreamTableEnvironmentImpl extends AbstractStreamTableEnviron
     public StreamTableEnvironmentImpl(
             CatalogManager catalogManager,
             ModuleManager moduleManager,
+            ResourceManager resourceManager,
             FunctionCatalog functionCatalog,
             TableConfig tableConfig,
             StreamExecutionEnvironment executionEnvironment,
             Planner planner,
             Executor executor,
-            boolean isStreamingMode,
-            ClassLoader userClassLoader) {
+            boolean isStreamingMode) {
         super(
                 catalogManager,
                 moduleManager,
+                resourceManager,
                 tableConfig,
                 executor,
                 functionCatalog,
                 planner,
                 isStreamingMode,
-                userClassLoader,
                 executionEnvironment);
     }
 
     public static StreamTableEnvironment create(
             StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings) {
-        final ClassLoader classLoader = settings.getUserClassLoader();
-
-        final Executor executor = lookupExecutor(classLoader, executionEnvironment);
+        final MutableURLClassLoader userClassLoader =
+                FlinkUserCodeClassLoaders.create(
+                        new URL[0], settings.getUserClassLoader(), settings.getConfiguration());
+        final Executor executor = lookupExecutor(userClassLoader, executionEnvironment);
 
         final TableConfig tableConfig = TableConfig.getDefault();
         tableConfig.setRootConfiguration(executor.getConfiguration());
         tableConfig.addConfiguration(settings.getConfiguration());
 
+        final ResourceManager resourceManager =
+                new ResourceManager(settings.getConfiguration(), userClassLoader);
         final ModuleManager moduleManager = new ModuleManager();
 
         final CatalogManager catalogManager =
                 CatalogManager.newBuilder()
-                        .classLoader(classLoader)
+                        .classLoader(userClassLoader)
                         .config(tableConfig)
                         .defaultCatalog(
                                 settings.getBuiltInCatalogName(),
@@ -114,13 +121,13 @@ public final class StreamTableEnvironmentImpl extends AbstractStreamTableEnviron
                         .build();
 
         final FunctionCatalog functionCatalog =
-                new FunctionCatalog(tableConfig, catalogManager, moduleManager, classLoader);
+                new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager);
 
         final Planner planner =
                 PlannerFactoryUtil.createPlanner(
                         executor,
                         tableConfig,
-                        classLoader,
+                        userClassLoader,
                         moduleManager,
                         catalogManager,
                         functionCatalog);
@@ -128,13 +135,13 @@ public final class StreamTableEnvironmentImpl extends AbstractStreamTableEnviron
         return new StreamTableEnvironmentImpl(
                 catalogManager,
                 moduleManager,
+                resourceManager,
                 functionCatalog,
                 tableConfig,
                 executionEnvironment,
                 planner,
                 executor,
-                settings.isStreamingMode(),
-                classLoader);
+                settings.isStreamingMode());
     }
 
     @Override
diff --git a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
index b2d4162c21d..4245c6193ca 100644
--- a/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
+++ b/flink-table/flink-table-api-java-bridge/src/test/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImplTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.ModifyOperation;
+import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.table.utils.ExecutorMock;
 import org.apache.flink.table.utils.PlannerMock;
@@ -34,6 +35,7 @@ import org.apache.flink.types.Row;
 
 import org.junit.jupiter.api.Test;
 
+import java.net.URL;
 import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
@@ -77,20 +79,22 @@ class StreamTableEnvironmentImplTest {
         TableConfig tableConfig = TableConfig.getDefault();
         CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
         ModuleManager moduleManager = new ModuleManager();
+        ResourceManager resourceManager =
+                ResourceManager.createResourceManager(
+                        new URL[0],
+                        Thread.currentThread().getContextClassLoader(),
+                        tableConfig.getConfiguration());
+
         return new StreamTableEnvironmentImpl(
                 catalogManager,
                 moduleManager,
-                new FunctionCatalog(
-                        tableConfig,
-                        catalogManager,
-                        moduleManager,
-                        StreamTableEnvironmentImplTest.class.getClassLoader()),
+                resourceManager,
+                new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager),
                 tableConfig,
                 env,
                 new TestPlanner(elements.getTransformation()),
                 new ExecutorMock(),
-                true,
-                this.getClass().getClassLoader());
+                true);
     }
 
     private static class TestPlanner extends PlannerMock {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index f93782eb71e..68b97a0d279 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -22,7 +22,9 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.table.api.CompiledPlan;
 import org.apache.flink.table.api.DataTypes;
@@ -145,6 +147,7 @@ import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropViewOperation;
 import org.apache.flink.table.operations.utils.OperationTreeBuilder;
+import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceValidation;
@@ -155,10 +158,13 @@ import org.apache.flink.table.types.utils.DataTypeUtils;
 import org.apache.flink.table.utils.TableSchemaUtils;
 import org.apache.flink.table.utils.print.PrintStyle;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
+import org.apache.flink.util.MutableURLClassLoader;
 import org.apache.flink.util.Preconditions;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -167,6 +173,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -186,6 +193,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
     private static final boolean IS_STREAM_TABLE = true;
     private final CatalogManager catalogManager;
     private final ModuleManager moduleManager;
+    private final ResourceManager resourceManager;
     private final OperationTreeBuilder operationTreeBuilder;
 
     protected final TableConfig tableConfig;
@@ -193,7 +201,6 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
     protected final FunctionCatalog functionCatalog;
     protected final Planner planner;
     private final boolean isStreamingMode;
-    private final ClassLoader userClassLoader;
     private static final String UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG =
             "Unsupported SQL query! executeSql() only accepts a single SQL statement of type "
                     + "CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, "
@@ -207,14 +214,15 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
     protected TableEnvironmentImpl(
             CatalogManager catalogManager,
             ModuleManager moduleManager,
+            ResourceManager resourceManager,
             TableConfig tableConfig,
             Executor executor,
             FunctionCatalog functionCatalog,
             Planner planner,
-            boolean isStreamingMode,
-            ClassLoader userClassLoader) {
+            boolean isStreamingMode) {
         this.catalogManager = catalogManager;
         this.moduleManager = moduleManager;
+        this.resourceManager = resourceManager;
         this.execEnv = executor;
 
         this.tableConfig = tableConfig;
@@ -222,11 +230,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
         this.functionCatalog = functionCatalog;
         this.planner = planner;
         this.isStreamingMode = isStreamingMode;
-        this.userClassLoader = userClassLoader;
         this.operationTreeBuilder =
                 OperationTreeBuilder.create(
                         tableConfig,
-                        userClassLoader,
+                        resourceManager.getUserClassLoader(),
                         functionCatalog.asLookup(getParser()::parseIdentifier),
                         catalogManager.getDataTypeFactory(),
                         path -> {
@@ -260,11 +267,13 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
     }
 
     public static TableEnvironmentImpl create(EnvironmentSettings settings) {
-        final ClassLoader classLoader = settings.getUserClassLoader();
+        final MutableURLClassLoader userClassLoader =
+                FlinkUserCodeClassLoaders.create(
+                        new URL[0], settings.getUserClassLoader(), settings.getConfiguration());
 
         final ExecutorFactory executorFactory =
                 FactoryUtil.discoverFactory(
-                        classLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER);
+                        userClassLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER);
         final Executor executor = executorFactory.create(settings.getConfiguration());
 
         // use configuration to init table config
@@ -272,11 +281,12 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
         tableConfig.setRootConfiguration(executor.getConfiguration());
         tableConfig.addConfiguration(settings.getConfiguration());
 
+        final ResourceManager resourceManager =
+                new ResourceManager(settings.getConfiguration(), userClassLoader);
         final ModuleManager moduleManager = new ModuleManager();
-
         final CatalogManager catalogManager =
                 CatalogManager.newBuilder()
-                        .classLoader(classLoader)
+                        .classLoader(userClassLoader)
                         .config(tableConfig)
                         .defaultCatalog(
                                 settings.getBuiltInCatalogName(),
@@ -286,13 +296,13 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
                         .build();
 
         final FunctionCatalog functionCatalog =
-                new FunctionCatalog(tableConfig, catalogManager, moduleManager, classLoader);
+                new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager);
 
         final Planner planner =
                 PlannerFactoryUtil.createPlanner(
                         executor,
                         tableConfig,
-                        classLoader,
+                        userClassLoader,
                         moduleManager,
                         catalogManager,
                         functionCatalog);
@@ -300,12 +310,12 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
         return new TableEnvironmentImpl(
                 catalogManager,
                 moduleManager,
+                resourceManager,
                 tableConfig,
                 executor,
                 functionCatalog,
                 planner,
-                settings.isStreamingMode(),
-                classLoader);
+                settings.isStreamingMode());
     }
 
     @Override
@@ -791,6 +801,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
     private TableResultInternal executeInternal(
             List<Transformation<?>> transformations, List<String> sinkIdentifierNames) {
         final String defaultJobName = "insert-into_" + String.join(",", sinkIdentifierNames);
+
+        // Merge user jars to table configuration
+        mergePipelineJarsToConfig(
+                resourceManager.getLocalJarResources(), tableConfig.getConfiguration());
+
         // We pass only the configuration to avoid reconfiguration with the rootConfiguration
         Pipeline pipeline =
                 execEnv.createPipeline(
@@ -822,6 +837,11 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
         List<Transformation<?>> transformations =
                 translate(Collections.singletonList(sinkOperation));
         final String defaultJobName = "collect";
+
+        // Merge user jars to table configuration
+        mergePipelineJarsToConfig(
+                resourceManager.getLocalJarResources(), tableConfig.getConfiguration());
+
         // We pass only the configuration to avoid reconfiguration with the rootConfiguration
         Pipeline pipeline =
                 execEnv.createPipeline(
@@ -1349,7 +1369,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 
             Catalog catalog =
                     FactoryUtil.createCatalog(
-                            catalogName, properties, tableConfig, userClassLoader);
+                            catalogName,
+                            properties,
+                            tableConfig,
+                            resourceManager.getUserClassLoader());
             catalogManager.registerCatalog(catalogName, catalog);
 
             return TableResultImpl.TABLE_RESULT_OK;
@@ -1366,7 +1389,7 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
                             operation.getModuleName(),
                             operation.getOptions(),
                             tableConfig,
-                            userClassLoader);
+                            resourceManager.getUserClassLoader());
             moduleManager.loadModule(operation.getModuleName(), module);
             return TableResultImpl.TABLE_RESULT_OK;
         } catch (ValidationException e) {
@@ -1849,6 +1872,15 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
         }
     }
 
+    private void mergePipelineJarsToConfig(Set<URL> jarUrls, Configuration configuration) {
+        ConfigUtils.mergeCollectionsToConfig(
+                configuration,
+                PipelineOptions.JARS,
+                jarUrls.stream().map(URL::toString).collect(Collectors.toSet()),
+                String::toString,
+                String::toString);
+    }
+
     @VisibleForTesting
     public TableImpl createTable(QueryOperation tableOperation) {
         return TableImpl.createTable(
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
index bd624a37b7d..6a6c0c940ff 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/FunctionCatalog.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.functions.TableFunctionDefinition;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
 import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.util.Preconditions;
 
@@ -65,9 +66,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public final class FunctionCatalog {
     private final ReadableConfig config;
+    private final ResourceManager resourceManager;
     private final CatalogManager catalogManager;
     private final ModuleManager moduleManager;
-    private ClassLoader classLoader;
 
     private final Map<String, CatalogFunction> tempSystemFunctions = new LinkedHashMap<>();
     private final Map<ObjectIdentifier, CatalogFunction> tempCatalogFunctions =
@@ -81,18 +82,13 @@ public final class FunctionCatalog {
 
     public FunctionCatalog(
             ReadableConfig config,
+            ResourceManager resourceManager,
             CatalogManager catalogManager,
-            ModuleManager moduleManager,
-            ClassLoader classLoader) {
+            ModuleManager moduleManager) {
         this.config = checkNotNull(config);
+        this.resourceManager = checkNotNull(resourceManager);
         this.catalogManager = checkNotNull(catalogManager);
         this.moduleManager = checkNotNull(moduleManager);
-        this.classLoader = classLoader;
-    }
-
-    /** Updates the classloader, this is a temporary solution until FLINK-14055 is fixed. */
-    public void updateClassLoader(ClassLoader classLoader) {
-        this.classLoader = classLoader;
     }
 
     public void setPlannerTypeInferenceUtil(PlannerTypeInferenceUtil plannerTypeInferenceUtil) {
@@ -159,7 +155,7 @@ public final class FunctionCatalog {
                                         normalizedIdentifier.toObjectPath(), catalogFunction);
             }
             try {
-                validateAndPrepareFunction(catalogFunction);
+                validateAndPrepareFunction(identifier.asSummaryString(), catalogFunction);
             } catch (Throwable t) {
                 throw new ValidationException(
                         String.format(
@@ -506,7 +502,7 @@ public final class FunctionCatalog {
         final String normalizedName = FunctionIdentifier.normalizeName(name);
 
         try {
-            validateAndPrepareFunction(function);
+            validateAndPrepareFunction(name, function);
         } catch (Throwable t) {
             throw new ValidationException(
                     String.format(
@@ -634,7 +630,7 @@ public final class FunctionCatalog {
     }
 
     @SuppressWarnings("unchecked")
-    private void validateAndPrepareFunction(CatalogFunction function)
+    private void validateAndPrepareFunction(String name, CatalogFunction function)
             throws ClassNotFoundException {
         // If the input is instance of UserDefinedFunction, it means it uses the new type inference.
         // In this situation the UDF have not been validated and cleaned, so we need to validate it
@@ -649,9 +645,15 @@ public final class FunctionCatalog {
             }
             // Skip validation if it's not a UserDefinedFunction.
         } else if (function.getFunctionLanguage() == FunctionLanguage.JAVA) {
+            // If the jar resource of UDF used is not empty, register it to classloader before
+            // validate.
+            registerFunctionJarResources(name, function.getFunctionResources());
+
             UserDefinedFunctionHelper.validateClass(
                     (Class<? extends UserDefinedFunction>)
-                            classLoader.loadClass(function.getClassName()));
+                            resourceManager
+                                    .getUserClassLoader()
+                                    .loadClass(function.getClassName()));
         }
     }
 
@@ -662,15 +664,33 @@ public final class FunctionCatalog {
             // directly.
             return ((InlineCatalogFunction) function).getDefinition();
         }
+        // If the jar resource of UDF used is not empty, register it to classloader before
+        // validate.
+        registerFunctionJarResources(name, function.getFunctionResources());
+
         return UserDefinedFunctionHelper.instantiateFunction(
-                classLoader,
+                resourceManager.getUserClassLoader(),
                 // future
                 config,
                 name,
                 function);
     }
 
-    /** The CatalogFunction which holds a instantiated UDF. */
+    private void registerFunctionJarResources(String functionName, List<ResourceUri> resourceUris) {
+        try {
+            if (!resourceUris.isEmpty()) {
+                resourceManager.registerJarResources(resourceUris);
+            }
+        } catch (Exception e) {
+            throw new TableException(
+                    String.format(
+                            "Failed to register jar resource '%s' of function '%s'.",
+                            resourceUris, functionName),
+                    e);
+        }
+    }
+
+    /** The CatalogFunction which holds an instantiated UDF. */
     public static class InlineCatalogFunction implements CatalogFunction {
 
         private final FunctionDefinition definition;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
index 49be84749ce..2596ed056f4 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/resource/ResourceManager.java
@@ -23,8 +23,11 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.FlinkUserCodeClassLoaders;
 import org.apache.flink.util.JarUtils;
 import org.apache.flink.util.MutableURLClassLoader;
 
@@ -41,6 +44,7 @@ import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -52,9 +56,19 @@ public class ResourceManager implements Closeable {
 
     private static final Logger LOG = LoggerFactory.getLogger(ResourceManager.class);
 
+    private static final String JAR_SUFFIX = "jar";
+    private static final String FILE_SCHEME = "file";
+
     private final Path localResourceDir;
-    private final Map<ResourceUri, URL> resourceInfos;
-    private final MutableURLClassLoader userClassLoader;
+    protected final Map<ResourceUri, URL> resourceInfos;
+    protected final MutableURLClassLoader userClassLoader;
+
+    public static ResourceManager createResourceManager(
+            URL[] urls, ClassLoader parent, Configuration configuration) {
+        MutableURLClassLoader mutableURLClassLoader =
+                FlinkUserCodeClassLoaders.create(urls, parent, configuration);
+        return new ResourceManager(configuration, mutableURLClassLoader);
+    }
 
     public ResourceManager(Configuration config, MutableURLClassLoader userClassLoader) {
         this.localResourceDir =
@@ -65,78 +79,149 @@ public class ResourceManager implements Closeable {
         this.userClassLoader = userClassLoader;
     }
 
-    public URLClassLoader getUserClassLoader() {
-        return userClassLoader;
-    }
+    /**
+     * Due to anyone of the resource in list maybe fail during register, so we should stage it
+     * before actual register to guarantee transaction process. If all the resources are available,
+     * register them into the {@link ResourceManager}.
+     */
+    public void registerJarResources(List<ResourceUri> resourceUris) throws IOException {
+        // check jar resource before register
+        checkJarResources(resourceUris);
 
-    public void registerResource(ResourceUri resourceUri) throws IOException {
-        // check whether the resource has been registered
-        if (resourceInfos.containsKey(resourceUri)) {
-            LOG.info(
-                    "Resource [{}] has been registered, overwriting of registered resource is not supported "
-                            + "in the current version, skipping.",
-                    resourceUri.getUri());
-            return;
-        }
+        Map<ResourceUri, URL> stagingResourceLocalURLs = new HashMap<>();
+        for (ResourceUri resourceUri : resourceUris) {
+            // check whether the resource has been registered
+            if (resourceInfos.containsKey(resourceUri) && resourceInfos.get(resourceUri) != null) {
+                LOG.info(
+                        "Resource [{}] has been registered, overwriting of registered resource is not supported "
+                                + "in the current version, skipping.",
+                        resourceUri.getUri());
+                continue;
+            }
 
-        // here can check whether the resource path is valid
-        Path path = new Path(resourceUri.getUri());
-        // check resource firstly
-        checkResource(path);
-
-        URL localUrl;
-        // check resource scheme
-        String scheme = StringUtils.lowerCase(path.toUri().getScheme());
-        // download resource to local path firstly if in remote
-        if (scheme != null && !"file".equals(scheme)) {
-            localUrl = downloadResource(path);
-        } else {
-            localUrl = getURLFromPath(path);
-        }
+            // here can check whether the resource path is valid
+            Path path = new Path(resourceUri.getUri());
+            URL localUrl;
+            // check resource scheme
+            String scheme = StringUtils.lowerCase(path.toUri().getScheme());
+            // download resource to local path firstly if in remote
+            if (scheme != null && !FILE_SCHEME.equals(scheme)) {
+                localUrl = downloadResource(path);
+            } else {
+                localUrl = getURLFromPath(path);
+                // if the local jar resource is a relative path, here convert it to absolute path
+                // before register
+                resourceUri = new ResourceUri(ResourceType.JAR, localUrl.getPath());
+            }
 
-        // only need add jar resource to classloader
-        if (ResourceType.JAR.equals(resourceUri.getResourceType())) {
-            // check the Jar file firstly
+            // check the local jar file
             JarUtils.checkJarFile(localUrl);
 
-            // add it to classloader
-            userClassLoader.addURL(localUrl);
-            LOG.info("Added jar resource [{}] to class path.", localUrl);
+            // add it to staging map
+            stagingResourceLocalURLs.put(resourceUri, localUrl);
         }
 
-        resourceInfos.put(resourceUri, localUrl);
-        LOG.info("Register resource [{}] successfully.", resourceUri.getUri());
+        // register resource in batch
+        stagingResourceLocalURLs.forEach(
+                (resourceUri, url) -> {
+                    // jar resource need add to classloader
+                    userClassLoader.addURL(url);
+                    LOG.info("Added jar resource [{}] to class path.", url);
+
+                    resourceInfos.put(resourceUri, url);
+                    LOG.info("Register resource [{}] successfully.", resourceUri.getUri());
+                });
+    }
+
+    public URLClassLoader getUserClassLoader() {
+        return userClassLoader;
     }
 
     public Map<ResourceUri, URL> getResources() {
         return Collections.unmodifiableMap(resourceInfos);
     }
 
-    public Set<URL> getJarResourceURLs() {
+    /**
+     * Get the local jars' URL. Return the URL corresponding to downloaded jars in the local file
+     * system for the remote jar. For the local jar, return the registered URL.
+     */
+    public Set<URL> getLocalJarResources() {
         return resourceInfos.entrySet().stream()
                 .filter(entry -> ResourceType.JAR.equals(entry.getKey().getResourceType()))
                 .map(Map.Entry::getValue)
                 .collect(Collectors.toSet());
     }
 
-    private void checkResource(Path path) throws IOException {
-        FileSystem fs = path.getFileSystem();
-        // check resource exists firstly
-        if (!fs.exists(path)) {
-            throw new FileNotFoundException(String.format("Resource [%s] not found.", path));
+    @Override
+    public void close() throws IOException {
+        resourceInfos.clear();
+
+        IOException exception = null;
+        try {
+            userClassLoader.close();
+        } catch (IOException e) {
+            LOG.debug("Error while closing user classloader.", e);
+            exception = e;
         }
 
-        // register directory is not allowed for resource
-        if (fs.getFileStatus(path).isDir()) {
-            throw new IOException(
+        FileSystem fileSystem = FileSystem.getLocalFileSystem();
+        try {
+            if (fileSystem.exists(localResourceDir)) {
+                fileSystem.delete(localResourceDir, true);
+            }
+        } catch (IOException ioe) {
+            LOG.debug(String.format("Error while delete directory [%s].", localResourceDir), ioe);
+            exception = ExceptionUtils.firstOrSuppressed(ioe, exception);
+        }
+
+        if (exception != null) {
+            throw exception;
+        }
+    }
+
+    private void checkJarResources(List<ResourceUri> resourceUris) throws IOException {
+        // only support register jar resource
+        if (resourceUris.stream()
+                .anyMatch(resourceUri -> !ResourceType.JAR.equals(resourceUri.getResourceType()))) {
+            throw new ValidationException(
                     String.format(
-                            "The resource [%s] is a directory, however, the directory is not allowed for registering resource.",
-                            path));
+                            "Only support to register jar resource, resource info:\n %s.",
+                            resourceUris.stream()
+                                    .map(ResourceUri::getUri)
+                                    .collect(Collectors.joining(",\n"))));
+        }
+
+        for (ResourceUri resourceUri : resourceUris) {
+            // here can check whether the resource path is valid
+            Path path = new Path(resourceUri.getUri());
+            // file name should end with .jar suffix
+            String fileExtension = Files.getFileExtension(path.getName());
+            if (!fileExtension.toLowerCase().endsWith(JAR_SUFFIX)) {
+                throw new ValidationException(
+                        String.format(
+                                "The registering jar resource [%s] must ends with '.jar' suffix.",
+                                path));
+            }
+
+            FileSystem fs = FileSystem.getUnguardedFileSystem(path.toUri());
+            // check resource exists firstly
+            if (!fs.exists(path)) {
+                throw new FileNotFoundException(
+                        String.format("Jar resource [%s] not found.", path));
+            }
+
+            // register directory is not allowed for resource
+            if (fs.getFileStatus(path).isDir()) {
+                throw new ValidationException(
+                        String.format(
+                                "The registering jar resource [%s] is a directory that is not allowed.",
+                                path));
+            }
         }
     }
 
     @VisibleForTesting
-    protected URL downloadResource(Path remotePath) throws IOException {
+    URL downloadResource(Path remotePath) throws IOException {
         // get local resource path
         Path localPath = getResourceLocalPath(remotePath);
         try {
@@ -155,6 +240,20 @@ public class ResourceManager implements Closeable {
         return getURLFromPath(localPath);
     }
 
+    @VisibleForTesting
+    URL getURLFromPath(Path path) throws IOException {
+        // if scheme is null, rewrite it to file
+        if (path.toUri().getScheme() == null) {
+            path = path.makeQualified(FileSystem.getLocalFileSystem());
+        }
+        return path.toUri().toURL();
+    }
+
+    @VisibleForTesting
+    Path getLocalResourceDir() {
+        return localResourceDir;
+    }
+
     private Path getResourceLocalPath(Path remotePath) {
         String fileName = remotePath.getName();
         String fileExtension = Files.getFileExtension(fileName);
@@ -172,26 +271,4 @@ public class ResourceManager implements Closeable {
         }
         return new Path(localResourceDir, fileNameWithUUID);
     }
-
-    @VisibleForTesting
-    protected URL getURLFromPath(Path path) throws IOException {
-        // if scheme is null, rewrite it to file
-        if (path.toUri().getScheme() == null) {
-            path = path.makeQualified(FileSystem.getLocalFileSystem());
-        }
-        return path.toUri().toURL();
-    }
-
-    @Override
-    public void close() throws IOException {
-        // close classloader
-        userClassLoader.close();
-        // clear the map
-        resourceInfos.clear();
-        // delete the local resource
-        FileSystem fileSystem = localResourceDir.getFileSystem();
-        if (fileSystem.exists(localResourceDir)) {
-            fileSystem.delete(localResourceDir, true);
-        }
-    }
 }
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
index a60ea2f1348..fa9baacbd9e 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/FunctionCatalogTest.java
@@ -27,11 +27,13 @@ import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.module.Module;
 import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.net.URL;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Optional;
@@ -83,14 +85,18 @@ class FunctionCatalogTest {
 
         moduleManager = new ModuleManager();
 
+        Configuration configuration = new Configuration();
         functionCatalog =
                 new FunctionCatalog(
-                        new Configuration(),
+                        configuration,
+                        ResourceManager.createResourceManager(
+                                new URL[0],
+                                FunctionCatalogTest.class.getClassLoader(),
+                                configuration),
                         CatalogManagerMocks.preparedCatalogManager()
                                 .defaultCatalog(DEFAULT_CATALOG, catalog)
                                 .build(),
-                        moduleManager,
-                        FunctionCatalogTest.class.getClassLoader());
+                        moduleManager);
     }
 
     @Test
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
index b6f43d5df5d..df11a571cc5 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/resource/ResourceManagerTest.java
@@ -19,73 +19,112 @@
 package org.apache.flink.table.resource;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.FlinkUserCodeClassLoaders;
-import org.apache.flink.util.MutableURLClassLoader;
 import org.apache.flink.util.UserClassLoaderJarTestUtils;
 
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.net.URL;
 import java.net.URLClassLoader;
+import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
-import static org.apache.flink.util.FlinkUserCodeClassLoader.NOOP_EXCEPTION_HANDLER;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 
 /** Tests for {@link ResourceManager}. */
 public class ResourceManagerTest {
 
-    @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-    public static final String LOWER_UDF_CLASS = "LowerUDF";
-    public static final String LOWER_UDF_CODE =
-            "public class "
-                    + LOWER_UDF_CLASS
-                    + " extends org.apache.flink.table.functions.ScalarFunction {\n"
-                    + "  public String eval(String str) {\n"
-                    + "    return str.toLowerCase();\n"
-                    + "  }\n"
-                    + "}\n";
-
+    @TempDir private static File tempFolder;
     private static File udfJar;
 
-    @BeforeClass
+    private ResourceManager resourceManager;
+
+    @BeforeAll
     public static void prepare() throws Exception {
         udfJar =
                 UserClassLoaderJarTestUtils.createJarFile(
-                        temporaryFolder.newFolder("test-jar"),
+                        tempFolder,
                         "test-classloader-udf.jar",
-                        LOWER_UDF_CLASS,
-                        LOWER_UDF_CODE);
+                        GENERATED_LOWER_UDF_CLASS,
+                        String.format(GENERATED_LOWER_UDF_CODE, GENERATED_LOWER_UDF_CLASS));
+    }
+
+    @BeforeEach
+    public void before() {
+        resourceManager =
+                ResourceManager.createResourceManager(
+                        new URL[0], getClass().getClassLoader(), new Configuration());
+    }
+
+    @AfterEach
+    public void after() throws Exception {
+        resourceManager.close();
     }
 
     @Test
     public void testRegisterResource() throws Exception {
-        ResourceManager resourceManager = createResourceManager(new URL[0]);
         URLClassLoader userClassLoader = resourceManager.getUserClassLoader();
 
         // test class loading before register resource
         CommonTestUtils.assertThrows(
-                String.format("LowerUDF"),
+                "LowerUDF",
                 ClassNotFoundException.class,
-                () -> Class.forName(LOWER_UDF_CLASS, false, userClassLoader));
+                () -> Class.forName(GENERATED_LOWER_UDF_CLASS, false, userClassLoader));
 
+        ResourceUri resourceUri = new ResourceUri(ResourceType.JAR, udfJar.getPath());
         // register the same jar repeatedly
-        resourceManager.registerResource(new ResourceUri(ResourceType.JAR, udfJar.getPath()));
-        resourceManager.registerResource(new ResourceUri(ResourceType.JAR, udfJar.getPath()));
+        resourceManager.registerJarResources(Arrays.asList(resourceUri, resourceUri));
+
+        // assert resource infos
+        Map<ResourceUri, URL> expected =
+                Collections.singletonMap(
+                        resourceUri, resourceManager.getURLFromPath(new Path(udfJar.getPath())));
+
+        assertEquals(expected, resourceManager.getResources());
+
+        // test load class
+        final Class<?> clazz1 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, userClassLoader);
+        final Class<?> clazz2 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, userClassLoader);
+
+        assertEquals(clazz1, clazz2);
+    }
+
+    @Test
+    public void testRegisterResourceWithRelativePath() throws Exception {
+        URLClassLoader userClassLoader = resourceManager.getUserClassLoader();
+
+        // test class loading before register resource
+        CommonTestUtils.assertThrows(
+                "LowerUDF",
+                ClassNotFoundException.class,
+                () -> Class.forName(GENERATED_LOWER_UDF_CLASS, false, userClassLoader));
+
+        ResourceUri resourceUri =
+                new ResourceUri(
+                        ResourceType.JAR,
+                        new File(".")
+                                .getCanonicalFile()
+                                .toPath()
+                                .relativize(udfJar.toPath())
+                                .toString());
+        // register jar
+        resourceManager.registerJarResources(Collections.singletonList(resourceUri));
 
         // assert resource infos
         Map<ResourceUri, URL> expected =
@@ -96,51 +135,58 @@ public class ResourceManagerTest {
         assertEquals(expected, resourceManager.getResources());
 
         // test load class
-        final Class<?> clazz1 = Class.forName(LOWER_UDF_CLASS, false, userClassLoader);
-        final Class<?> clazz2 = Class.forName(LOWER_UDF_CLASS, false, userClassLoader);
+        final Class<?> clazz1 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, userClassLoader);
+        final Class<?> clazz2 = Class.forName(GENERATED_LOWER_UDF_CLASS, false, userClassLoader);
 
         assertEquals(clazz1, clazz2);
-
-        resourceManager.close();
     }
 
     @Test
     public void testRegisterInvalidResource() throws Exception {
-        ResourceManager resourceManager = createResourceManager(new URL[0]);
-
-        // test register non-exist file
-        final String fileUri =
-                temporaryFolder.getRoot().getPath() + Path.SEPARATOR + "test-non-exist-file";
+        final String fileUri = tempFolder.getAbsolutePath() + Path.SEPARATOR + "test-file";
 
         CommonTestUtils.assertThrows(
-                String.format("Resource [%s] not found.", fileUri),
-                FileNotFoundException.class,
+                String.format(
+                        "Only support to register jar resource, resource info:\n %s.", fileUri),
+                ValidationException.class,
                 () -> {
-                    resourceManager.registerResource(new ResourceUri(ResourceType.FILE, fileUri));
+                    resourceManager.registerJarResources(
+                            Collections.singletonList(new ResourceUri(ResourceType.FILE, fileUri)));
                     return null;
                 });
 
         // test register directory for jar resource
-        final String jarUri = temporaryFolder.newFolder("test-jar-dir").getPath();
+        final String jarDir = tempFolder.getPath();
 
         CommonTestUtils.assertThrows(
                 String.format(
-                        "The resource [%s] is a directory, however, the directory is not allowed for registering resource.",
-                        jarUri),
-                IOException.class,
+                        "The registering jar resource [%s] must ends with '.jar' suffix.", jarDir),
+                ValidationException.class,
                 () -> {
-                    resourceManager.registerResource(new ResourceUri(ResourceType.JAR, jarUri));
+                    resourceManager.registerJarResources(
+                            Collections.singletonList(new ResourceUri(ResourceType.JAR, jarDir)));
                     return null;
                 });
 
-        resourceManager.close();
+        // test register directory for jar resource
+        String jarPath =
+                Files.createDirectory(Paths.get(tempFolder.getPath(), "test-jar.jar")).toString();
+
+        CommonTestUtils.assertThrows(
+                String.format(
+                        "The registering jar resource [%s] is a directory that is not allowed.",
+                        jarPath),
+                ValidationException.class,
+                () -> {
+                    resourceManager.registerJarResources(
+                            Collections.singletonList(new ResourceUri(ResourceType.JAR, jarPath)));
+                    return null;
+                });
     }
 
     @Test
     public void testDownloadResource() throws Exception {
         Path srcPath = new Path(udfJar.getPath());
-        ResourceManager resourceManager = createResourceManager(new URL[0]);
-
         // test download resource to local path
         URL localUrl = resourceManager.downloadResource(srcPath);
 
@@ -148,35 +194,12 @@ public class ResourceManagerTest {
         byte[] actual = FileUtils.readAllBytes(Paths.get(localUrl.toURI()));
 
         assertArrayEquals(expected, actual);
-
-        resourceManager.close();
-    }
-
-    private ResourceManager createResourceManager(URL[] urls) {
-        Configuration configuration = new Configuration();
-        MutableURLClassLoader mutableURLClassLoader =
-                createClassLoader(configuration, urls, getClass().getClassLoader());
-        return new ResourceManager(configuration, mutableURLClassLoader);
     }
 
-    private MutableURLClassLoader createClassLoader(
-            Configuration configuration, URL[] urls, ClassLoader parentClassLoader) {
-        final String[] alwaysParentFirstLoaderPatterns =
-                CoreOptions.getParentFirstLoaderPatterns(configuration);
-
-        // According to the specified class load order,child-first or parent-first
-        // child-first: load from this classloader firstly, if not found, then from parent
-        // parent-first: load from parent firstly, if not found, then from this classloader
-        final String classLoaderResolveOrder =
-                configuration.getString(CoreOptions.CLASSLOADER_RESOLVE_ORDER);
-        FlinkUserCodeClassLoaders.ResolveOrder resolveOrder =
-                FlinkUserCodeClassLoaders.ResolveOrder.fromString(classLoaderResolveOrder);
-        return FlinkUserCodeClassLoaders.create(
-                resolveOrder,
-                urls,
-                parentClassLoader,
-                alwaysParentFirstLoaderPatterns,
-                NOOP_EXCEPTION_HANDLER,
-                true);
+    @Test
+    public void testCloseResourceManagerCleanDownloadedResources() throws Exception {
+        resourceManager.close();
+        FileSystem fileSystem = FileSystem.getLocalFileSystem();
+        assertFalse(fileSystem.exists(resourceManager.getLocalResourceDir()));
     }
 }
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
index 0dec32abd5a..6a6384f90a3 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/TableEnvironmentMock.java
@@ -25,6 +25,9 @@ import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.table.resource.ResourceManager;
+
+import java.net.URL;
 
 /** Mocking {@link TableEnvironment} for tests. */
 public class TableEnvironmentMock extends TableEnvironmentImpl {
@@ -40,6 +43,7 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
     protected TableEnvironmentMock(
             CatalogManager catalogManager,
             ModuleManager moduleManager,
+            ResourceManager userResourceManager,
             TableConfig tableConfig,
             ExecutorMock executor,
             FunctionCatalog functionCatalog,
@@ -48,13 +52,12 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
         super(
                 catalogManager,
                 moduleManager,
+                userResourceManager,
                 tableConfig,
                 executor,
                 functionCatalog,
                 planner,
-                isStreamingMode,
-                TableEnvironmentMock.class.getClassLoader());
-
+                isStreamingMode);
         this.catalogManager = catalogManager;
         this.executor = executor;
         this.functionCatalog = functionCatalog;
@@ -73,12 +76,19 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
         final TableConfig tableConfig = TableConfig.getDefault();
         final CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
         final ModuleManager moduleManager = new ModuleManager();
+        final ResourceManager resourceManager =
+                ResourceManager.createResourceManager(
+                        new URL[0],
+                        Thread.currentThread().getContextClassLoader(),
+                        tableConfig.getConfiguration());
+
         return new TableEnvironmentMock(
                 catalogManager,
                 moduleManager,
+                resourceManager,
                 tableConfig,
                 createExecutor(),
-                createFunctionCatalog(tableConfig, catalogManager, moduleManager),
+                createFunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager),
                 createPlanner(),
                 isStreamingMode);
     }
@@ -88,9 +98,11 @@ public class TableEnvironmentMock extends TableEnvironmentImpl {
     }
 
     private static FunctionCatalog createFunctionCatalog(
-            ReadableConfig config, CatalogManager catalogManager, ModuleManager moduleManager) {
-        return new FunctionCatalog(
-                config, catalogManager, moduleManager, TableEnvironmentMock.class.getClassLoader());
+            ReadableConfig config,
+            ResourceManager resourceManager,
+            CatalogManager catalogManager,
+            ModuleManager moduleManager) {
+        return new FunctionCatalog(config, resourceManager, catalogManager, moduleManager);
     }
 
     private static PlannerMock createPlanner() {
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/UserDefinedFunctions.java
similarity index 81%
rename from flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
rename to flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/UserDefinedFunctions.java
index d0cd826162f..81ef13dc23a 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/UserDefinedFunctions.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/UserDefinedFunctions.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.client.gateway.utils;
+package org.apache.flink.table.utils;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -29,16 +29,25 @@ import org.apache.flink.types.Row;
 /** A bunch of UDFs for testing the SQL Client. */
 public class UserDefinedFunctions {
 
-    public static final String GENERATED_UDF_CLASS = "LowerUDF";
+    public static final String GENERATED_LOWER_UDF_CLASS = "LowerUDF";
+    public static final String GENERATED_UPPER_UDF_CLASS = "UpperUDF";
 
-    public static final String GENERATED_UDF_CODE =
+    public static final String GENERATED_LOWER_UDF_CODE =
             "public class "
-                    + GENERATED_UDF_CLASS
+                    + "%s"
                     + " extends org.apache.flink.table.functions.ScalarFunction {\n"
                     + "  public String eval(String str) {\n"
                     + "    return str.toLowerCase();\n"
                     + "  }\n"
                     + "}\n";
+    public static final String GENERATED_UPPER_UDF_CODE =
+            "public class "
+                    + "%s"
+                    + " extends org.apache.flink.table.functions.ScalarFunction {\n"
+                    + "  public String eval(String str) {\n"
+                    + "    return str.toUpperCase();\n"
+                    + "  }\n"
+                    + "}\n";
 
     /** The scalar function for SQL Client test. */
     public static class ScalarUDF extends ScalarFunction {
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index 83d78a569d7..abea62f3261 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -33,12 +33,14 @@ import org.apache.flink.table.factories.PlannerFactoryUtil
 import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserDefinedFunctionHelper}
 import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.operations._
+import org.apache.flink.table.resource.ResourceManager
 import org.apache.flink.table.sources.{TableSource, TableSourceValidation}
 import org.apache.flink.table.types.AbstractDataType
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.types.Row
-import org.apache.flink.util.Preconditions
+import org.apache.flink.util.{FlinkUserCodeClassLoaders, MutableURLClassLoader, Preconditions}
 
+import java.net.URL
 import java.util.Optional
 
 import scala.collection.JavaConverters._
@@ -51,22 +53,22 @@ import scala.collection.JavaConverters._
 class StreamTableEnvironmentImpl(
     catalogManager: CatalogManager,
     moduleManager: ModuleManager,
+    resourceManager: ResourceManager,
     functionCatalog: FunctionCatalog,
     tableConfig: TableConfig,
     scalaExecutionEnvironment: StreamExecutionEnvironment,
     planner: Planner,
     executor: Executor,
-    isStreaming: Boolean,
-    userClassLoader: ClassLoader)
+    isStreaming: Boolean)
   extends AbstractStreamTableEnvironmentImpl(
     catalogManager,
     moduleManager,
+    resourceManager,
     tableConfig,
     executor,
     functionCatalog,
     planner,
     isStreaming,
-    userClassLoader,
     scalaExecutionEnvironment.getWrappedStreamExecutionEnvironment)
   with StreamTableEnvironment {
 
@@ -291,20 +293,24 @@ object StreamTableEnvironmentImpl {
   def create(
       executionEnvironment: StreamExecutionEnvironment,
       settings: EnvironmentSettings): StreamTableEnvironmentImpl = {
-    val classLoader = settings.getUserClassLoader
+    val userClassLoader: MutableURLClassLoader =
+      FlinkUserCodeClassLoaders.create(
+        new Array[URL](0),
+        settings.getUserClassLoader,
+        settings.getConfiguration)
 
     val executor = AbstractStreamTableEnvironmentImpl.lookupExecutor(
-      classLoader,
+      userClassLoader,
       executionEnvironment.getWrappedStreamExecutionEnvironment)
 
     val tableConfig = TableConfig.getDefault
     tableConfig.setRootConfiguration(executor.getConfiguration)
     tableConfig.addConfiguration(settings.getConfiguration)
 
+    val resourceManager = new ResourceManager(settings.getConfiguration, userClassLoader)
     val moduleManager = new ModuleManager
-
     val catalogManager = CatalogManager.newBuilder
-      .classLoader(classLoader)
+      .classLoader(userClassLoader)
       .config(tableConfig)
       .defaultCatalog(
         settings.getBuiltInCatalogName,
@@ -313,12 +319,12 @@ object StreamTableEnvironmentImpl {
       .build
 
     val functionCatalog =
-      new FunctionCatalog(tableConfig, catalogManager, moduleManager, classLoader)
+      new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager)
 
     val planner = PlannerFactoryUtil.createPlanner(
       executor,
       tableConfig,
-      classLoader,
+      userClassLoader,
       moduleManager,
       catalogManager,
       functionCatalog)
@@ -326,13 +332,13 @@ object StreamTableEnvironmentImpl {
     new StreamTableEnvironmentImpl(
       catalogManager,
       moduleManager,
+      resourceManager,
       functionCatalog,
       tableConfig,
       executionEnvironment,
       planner,
       executor,
-      settings.isStreamingMode,
-      classLoader
+      settings.isStreamingMode
     )
   }
 }
diff --git a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
index 7bf9259a8b6..5a2823a64dc 100644
--- a/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/test/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImplTest.scala
@@ -23,12 +23,14 @@ import org.apache.flink.table.api.TableConfig
 import org.apache.flink.table.catalog.FunctionCatalog
 import org.apache.flink.table.module.ModuleManager
 import org.apache.flink.table.operations.ModifyOperation
+import org.apache.flink.table.resource.ResourceManager
 import org.apache.flink.table.utils.{CatalogManagerMocks, ExecutorMock, PlannerMock}
 import org.apache.flink.types.Row
 
 import org.assertj.core.api.Assertions.assertThat
 import org.junit.jupiter.api.Test
 
+import java.net.URL
 import java.time.Duration
 import java.util.{Collections, List => JList}
 
@@ -70,20 +72,21 @@ class StreamTableEnvironmentImplTest {
     val tableConfig = TableConfig.getDefault
     val catalogManager = CatalogManagerMocks.createEmptyCatalogManager()
     val moduleManager = new ModuleManager
+    val resourceManager = ResourceManager.createResourceManager(
+      new Array[URL](0),
+      Thread.currentThread.getContextClassLoader,
+      tableConfig.getConfiguration)
+
     new StreamTableEnvironmentImpl(
       catalogManager,
       moduleManager,
-      new FunctionCatalog(
-        tableConfig,
-        catalogManager,
-        moduleManager,
-        Thread.currentThread().getContextClassLoader),
+      resourceManager,
+      new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager),
       tableConfig,
       env,
       new TestPlanner(elements.javaStream.getTransformation),
       new ExecutorMock,
-      true,
-      this.getClass.getClassLoader)
+      true)
   }
 
   private class TestPlanner(transformation: Transformation[_]) extends PlannerMock {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/CompactManagedTableITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/CompactManagedTableITCase.java
index bb3210faa58..c9e9e6faeaa 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/CompactManagedTableITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/CompactManagedTableITCase.java
@@ -71,7 +71,7 @@ public class CompactManagedTableITCase extends BatchTestBase {
 
     @Override
     @Before
-    public void before() {
+    public void before() throws Exception {
         super.before();
         MANAGED_TABLES.put(tableIdentifier, new AtomicReference<>());
         referenceOfManagedTableFileEntries = new AtomicReference<>();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
index d7737b6c053..522eddc2eaa 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/ForwardHashExchangeITCase.java
@@ -35,7 +35,7 @@ import java.util.Arrays;
 public class ForwardHashExchangeITCase extends BatchTestBase {
 
     @Before
-    public void before() {
+    public void before() throws Exception {
         super.before();
         env().getConfig().setDynamicGraph(true);
         env().disableOperatorChaining();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
new file mode 100644
index 00000000000..77aa2bfa9df
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/FunctionITCase.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.runtime.batch.sql;
+
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.planner.factories.utils.TestCollectionTableFactory;
+import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for catalog and system functions in a table environment. */
+public class FunctionITCase extends BatchTestBase {
+
+    private static final Random random = new Random();
+    private String udfClassName;
+    private String jarPath;
+
+    @Before
+    @Override
+    public void before() throws Exception {
+        super.before();
+        udfClassName = GENERATED_LOWER_UDF_CLASS + random.nextInt(50);
+        jarPath =
+                UserClassLoaderJarTestUtils.createJarFile(
+                                TEMPORARY_FOLDER.newFolder(
+                                        String.format("test-jar-%s", UUID.randomUUID())),
+                                "test-classloader-udf.jar",
+                                udfClassName,
+                                String.format(GENERATED_LOWER_UDF_CODE, udfClassName))
+                        .toURI()
+                        .toString();
+    }
+
+    @Test
+    public void testCreateTemporarySystemFunctionByUsingJar() {
+        String ddl1 =
+                String.format(
+                        "CREATE TEMPORARY SYSTEM FUNCTION f10 AS '%s' USING JAR '%s'",
+                        udfClassName, jarPath);
+        String ddl2 =
+                String.format(
+                        "CREATE TEMPORARY SYSTEM FUNCTION f11 AS '%s' USING JAR '%s'",
+                        udfClassName, jarPath);
+        tEnv().executeSql(ddl1);
+        tEnv().executeSql(ddl2);
+
+        List<String> functions = Arrays.asList(tEnv().listFunctions());
+        assertThat(functions).contains("f10");
+        assertThat(functions).contains("f11");
+
+        tEnv().executeSql("DROP TEMPORARY SYSTEM FUNCTION f10");
+        tEnv().executeSql("DROP TEMPORARY SYSTEM FUNCTION f11");
+
+        functions = Arrays.asList(tEnv().listFunctions());
+        assertThat(functions).doesNotContain("f10");
+        assertThat(functions).doesNotContain("f11");
+    }
+
+    @Test
+    public void testCreateCatalogFunctionByUsingJar() {
+        String ddl =
+                String.format(
+                        "CREATE FUNCTION default_database.f11 AS '%s' USING JAR '%s'",
+                        udfClassName, jarPath);
+        tEnv().executeSql(ddl);
+        assertThat(Arrays.asList(tEnv().listFunctions())).contains("f11");
+
+        tEnv().executeSql("DROP FUNCTION default_database.f11");
+        assertThat(Arrays.asList(tEnv().listFunctions())).doesNotContain("f11");
+    }
+
+    @Test
+    public void testCreateTemporaryCatalogFunctionByUsingJar() {
+        String ddl =
+                String.format(
+                        "CREATE TEMPORARY FUNCTION default_database.f12 AS '%s' USING JAR '%s'",
+                        udfClassName, jarPath);
+        tEnv().executeSql(ddl);
+        assertThat(Arrays.asList(tEnv().listFunctions())).contains("f12");
+
+        tEnv().executeSql("DROP TEMPORARY FUNCTION default_database.f12");
+        assertThat(Arrays.asList(tEnv().listFunctions())).doesNotContain("f12");
+    }
+
+    @Test
+    public void testUserDefinedTemporarySystemFunctionByUsingJar() throws Exception {
+        String functionDDL =
+                String.format(
+                        "create temporary system function lowerUdf as '%s' using jar '%s'",
+                        udfClassName, jarPath);
+
+        String dropFunctionDDL = "drop temporary system function lowerUdf";
+        testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
+    }
+
+    @Test
+    public void testUserDefinedRegularCatalogFunctionByUsingJar() throws Exception {
+        String functionDDL =
+                String.format(
+                        "create function lowerUdf as '%s' using jar '%s'", udfClassName, jarPath);
+
+        String dropFunctionDDL = "drop function lowerUdf";
+        testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
+    }
+
+    @Test
+    public void testUserDefinedTemporaryCatalogFunctionByUsingJar() throws Exception {
+        String functionDDL =
+                String.format(
+                        "create temporary function lowerUdf as '%s' using jar '%s'",
+                        udfClassName, jarPath);
+
+        String dropFunctionDDL = "drop temporary function lowerUdf";
+        testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
+    }
+
+    private void testUserDefinedFunctionByUsingJar(String createFunctionDDL, String dropFunctionDDL)
+            throws Exception {
+        List<Row> sourceData =
+                Arrays.asList(
+                        Row.of(1, "JARK"),
+                        Row.of(2, "RON"),
+                        Row.of(3, "LeoNard"),
+                        Row.of(1, "FLINK"),
+                        Row.of(2, "CDC"));
+
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        String sourceDDL = "create table t1(a int, b varchar) with ('connector' = 'COLLECTION')";
+        String sinkDDL = "create table t2(a int, b varchar) with ('connector' = 'COLLECTION')";
+
+        String query = "select a, lowerUdf(b) from t1";
+
+        tEnv().executeSql(sourceDDL);
+        tEnv().executeSql(sinkDDL);
+        tEnv().executeSql(createFunctionDDL);
+        Table t2 = tEnv().sqlQuery(query);
+        t2.executeInsert("t2").await();
+
+        List<Row> result = TestCollectionTableFactory.RESULT();
+        List<Row> expected =
+                Arrays.asList(
+                        Row.of(1, "jark"),
+                        Row.of(2, "ron"),
+                        Row.of(3, "leonard"),
+                        Row.of(1, "flink"),
+                        Row.of(2, "cdc"));
+        assertThat(result).isEqualTo(expected);
+
+        tEnv().executeSql("drop table t1");
+        tEnv().executeSql("drop table t2");
+        // delete the function
+        tEnv().executeSql(dropFunctionDDL);
+    }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java
index 778bb09fefe..21b6e670a18 100755
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/agg/LocalAggregatePushDownITCase.java
@@ -36,7 +36,7 @@ import java.util.Collections;
 public class LocalAggregatePushDownITCase extends BatchTestBase {
 
     @Before
-    public void before() {
+    public void before() throws Exception {
         super.before();
         env().setParallelism(1); // set sink parallelism to 1
 
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
index bd55a390e1d..ea65c24de88 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java
@@ -52,7 +52,9 @@ import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.UserClassLoaderJarTestUtils;
 
+import org.junit.Before;
 import org.junit.Test;
 
 import java.lang.invoke.MethodHandle;
@@ -65,10 +67,14 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CLASS;
+import static org.apache.flink.table.utils.UserDefinedFunctions.GENERATED_LOWER_UDF_CODE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assertions.fail;
@@ -76,7 +82,7 @@ import static org.assertj.core.api.Assertions.fail;
 /**
  * Tests for catalog and system functions in a table environment.
  *
- * <p>Note: This class is meant for testing the core function support. Use {@link
+ * <p>Note: This class is meant for testing the core function support. Use {@code
  * org.apache.flink.table.planner.functions.BuiltInFunctionTestBase} for testing individual function
  * implementations.
  */
@@ -84,6 +90,26 @@ public class FunctionITCase extends StreamingTestBase {
 
     private static final String TEST_FUNCTION = TestUDF.class.getName();
 
+    private static final Random random = new Random();
+    private String udfClassName;
+    private String jarPath;
+
+    @Before
+    @Override
+    public void before() throws Exception {
+        super.before();
+        udfClassName = GENERATED_LOWER_UDF_CLASS + random.nextInt(50);
+        jarPath =
+                UserClassLoaderJarTestUtils.createJarFile(
+                                TEMPORARY_FOLDER.newFolder(
+                                        String.format("test-jar-%s", UUID.randomUUID())),
+                                "test-classloader-udf.jar",
+                                udfClassName,
+                                String.format(GENERATED_LOWER_UDF_CODE, udfClassName))
+                        .toURI()
+                        .toString();
+    }
+
     @Test
     public void testCreateCatalogFunctionInDefaultCatalog() {
         String ddl1 = "create function f1 as 'org.apache.flink.function.TestFunction'";
@@ -198,6 +224,45 @@ public class FunctionITCase extends StreamingTestBase {
         tEnv().executeSql(ddl3);
     }
 
+    @Test
+    public void testCreateTemporarySystemFunctionByUsingJar() {
+        String ddl =
+                String.format(
+                        "CREATE TEMPORARY SYSTEM FUNCTION f10 AS '%s' USING JAR '%s'",
+                        udfClassName, jarPath);
+        tEnv().executeSql(ddl);
+        assertThat(Arrays.asList(tEnv().listFunctions())).contains("f10");
+
+        tEnv().executeSql("DROP TEMPORARY SYSTEM FUNCTION f10");
+        assertThat(Arrays.asList(tEnv().listFunctions())).doesNotContain("f10");
+    }
+
+    @Test
+    public void testCreateCatalogFunctionByUsingJar() {
+        String ddl =
+                String.format(
+                        "CREATE FUNCTION default_database.f11 AS '%s' USING JAR '%s'",
+                        udfClassName, jarPath);
+        tEnv().executeSql(ddl);
+        assertThat(Arrays.asList(tEnv().listFunctions())).contains("f11");
+
+        tEnv().executeSql("DROP FUNCTION default_database.f11");
+        assertThat(Arrays.asList(tEnv().listFunctions())).doesNotContain("f11");
+    }
+
+    @Test
+    public void testCreateTemporaryCatalogFunctionByUsingJar() {
+        String ddl =
+                String.format(
+                        "CREATE TEMPORARY FUNCTION default_database.f12 AS '%s' USING JAR '%s'",
+                        udfClassName, jarPath);
+        tEnv().executeSql(ddl);
+        assertThat(Arrays.asList(tEnv().listFunctions())).contains("f12");
+
+        tEnv().executeSql("DROP TEMPORARY FUNCTION default_database.f12");
+        assertThat(Arrays.asList(tEnv().listFunctions())).doesNotContain("f12");
+    }
+
     @Test
     public void testAlterFunction() throws Exception {
         String create = "create function f3 as 'org.apache.flink.function.TestFunction'";
@@ -366,6 +431,38 @@ public class FunctionITCase extends StreamingTestBase {
         tEnv().executeSql(dropFunctionDDL);
     }
 
+    @Test
+    public void testUserDefinedTemporarySystemFunctionByUsingJar() throws Exception {
+        String functionDDL =
+                String.format(
+                        "create temporary system function lowerUdf as '%s' using jar '%s'",
+                        udfClassName, jarPath);
+
+        String dropFunctionDDL = "drop temporary system function lowerUdf";
+        testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
+    }
+
+    @Test
+    public void testUserDefinedRegularCatalogFunctionByUsingJar() throws Exception {
+        String functionDDL =
+                String.format(
+                        "create function lowerUdf as '%s' using jar '%s'", udfClassName, jarPath);
+
+        String dropFunctionDDL = "drop function lowerUdf";
+        testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
+    }
+
+    @Test
+    public void testUserDefinedTemporaryCatalogFunctionByUsingJar() throws Exception {
+        String functionDDL =
+                String.format(
+                        "create temporary function lowerUdf as '%s' using jar '%s'",
+                        udfClassName, jarPath);
+
+        String dropFunctionDDL = "drop temporary function lowerUdf";
+        testUserDefinedFunctionByUsingJar(functionDDL, dropFunctionDDL);
+    }
+
     @Test
     public void testUserDefinedTemporarySystemFunction() throws Exception {
         String functionDDL = "create temporary system function addOne as '" + TEST_FUNCTION + "'";
@@ -409,12 +506,51 @@ public class FunctionITCase extends StreamingTestBase {
         Table t2 = tEnv().sqlQuery(query);
         t2.executeInsert("t2").await();
 
-        Row[] result = TestCollectionTableFactory.RESULT().toArray(new Row[0]);
-        Row[] expected = sourceData.toArray(new Row[0]);
+        List<Row> result = TestCollectionTableFactory.RESULT();
+        assertThat(result).isEqualTo(sourceData);
+
+        tEnv().executeSql("drop table t1");
+        tEnv().executeSql("drop table t2");
+    }
+
+    private void testUserDefinedFunctionByUsingJar(String createFunctionDDL, String dropFunctionDDL)
+            throws Exception {
+        List<Row> sourceData =
+                Arrays.asList(
+                        Row.of(1, "JARK"),
+                        Row.of(2, "RON"),
+                        Row.of(3, "LeoNard"),
+                        Row.of(1, "FLINK"),
+                        Row.of(2, "CDC"));
+
+        TestCollectionTableFactory.reset();
+        TestCollectionTableFactory.initData(sourceData);
+
+        String sourceDDL = "create table t1(a int, b varchar) with ('connector' = 'COLLECTION')";
+        String sinkDDL = "create table t2(a int, b varchar) with ('connector' = 'COLLECTION')";
+
+        String query = "select a, lowerUdf(b) from t1";
+
+        tEnv().executeSql(sourceDDL);
+        tEnv().executeSql(sinkDDL);
+        tEnv().executeSql(createFunctionDDL);
+        Table t2 = tEnv().sqlQuery(query);
+        t2.executeInsert("t2").await();
+
+        List<Row> result = TestCollectionTableFactory.RESULT();
+        List<Row> expected =
+                Arrays.asList(
+                        Row.of(1, "jark"),
+                        Row.of(2, "ron"),
+                        Row.of(3, "leonard"),
+                        Row.of(1, "flink"),
+                        Row.of(2, "cdc"));
         assertThat(result).isEqualTo(expected);
 
         tEnv().executeSql("drop table t1");
         tEnv().executeSql("drop table t2");
+        // delete the function
+        tEnv().executeSql(dropFunctionDDL);
     }
 
     @Test
@@ -596,7 +732,7 @@ public class FunctionITCase extends StreamingTestBase {
     }
 
     @Test
-    public void testVarArgScalarFunction() throws Exception {
+    public void testVarArgScalarFunction() {
         final List<Row> sourceData = Arrays.asList(Row.of("Bob", 1), Row.of("Alice", 2));
 
         TestCollectionTableFactory.reset();
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
index b07a859da50..b852661f34e 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/PlannerMocks.java
@@ -31,11 +31,13 @@ import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema;
 import org.apache.flink.table.planner.delegation.ParserImpl;
 import org.apache.flink.table.planner.delegation.PlannerContext;
+import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 
 import org.apache.calcite.jdbc.CalciteSchema;
 import org.apache.calcite.plan.RelTraitDef;
 
+import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 
@@ -58,6 +60,7 @@ public class PlannerMocks {
     private PlannerMocks(
             boolean isBatchMode,
             TableConfig tableConfig,
+            ResourceManager resourceManager,
             CatalogManager catalogManager,
             List<RelTraitDef> traitDefs,
             CalciteSchema rootSchema) {
@@ -67,11 +70,7 @@ public class PlannerMocks {
         final ModuleManager moduleManager = new ModuleManager();
 
         this.functionCatalog =
-                new FunctionCatalog(
-                        tableConfig,
-                        catalogManager,
-                        moduleManager,
-                        PlannerMocks.class.getClassLoader());
+                new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager);
 
         this.plannerContext =
                 new PlannerContext(
@@ -156,6 +155,11 @@ public class PlannerMocks {
         private boolean batchMode = false;
         private TableConfig tableConfig = TableConfig.getDefault();
         private CatalogManager catalogManager = CatalogManagerMocks.createEmptyCatalogManager();
+        private ResourceManager resourceManager =
+                ResourceManager.createResourceManager(
+                        new URL[0],
+                        Thread.currentThread().getContextClassLoader(),
+                        tableConfig.getConfiguration());
         private List<RelTraitDef> traitDefs = Collections.emptyList();
         private CalciteSchema rootSchema;
 
@@ -176,6 +180,11 @@ public class PlannerMocks {
             return this;
         }
 
+        public Builder withResourceManager(ResourceManager resourceManager) {
+            this.resourceManager = resourceManager;
+            return this;
+        }
+
         public Builder withCatalogManager(CatalogManager catalogManager) {
             this.catalogManager = catalogManager;
             return this;
@@ -192,7 +201,8 @@ public class PlannerMocks {
         }
 
         public PlannerMocks build() {
-            return new PlannerMocks(batchMode, tableConfig, catalogManager, traitDefs, rootSchema);
+            return new PlannerMocks(
+                    batchMode, tableConfig, resourceManager, catalogManager, traitDefs, rootSchema);
         }
     }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index 48c110c680b..2eb87e35cc8 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -30,6 +30,7 @@ import org.apache.flink.table.planner.expressions.utils.Func1
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
 import org.apache.flink.table.planner.functions.utils.ScalarSqlFunction
 import org.apache.flink.table.planner.utils.{DateTimeTestUtil, IntSumAggFunction}
+import org.apache.flink.table.resource.ResourceManager
 import org.apache.flink.table.utils.CatalogManagerMocks
 
 import org.apache.calcite.rel.`type`.RelDataType
@@ -43,6 +44,7 @@ import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat, assertTrue
 import org.junit.Test
 
 import java.math.BigDecimal
+import java.net.URL
 import java.time.ZoneId
 import java.util.{Arrays, List => JList, TimeZone}
 
@@ -50,13 +52,15 @@ import scala.collection.JavaConverters._
 
 /** Test for [[RexNodeExtractor]]. */
 class RexNodeExtractorTest extends RexNodeTestBase {
+  val tableConfig = TableConfig.getDefault
+  val resourceManager = ResourceManager.createResourceManager(
+    new Array[URL](0),
+    Thread.currentThread.getContextClassLoader,
+    tableConfig.getConfiguration)
   val catalogManager: CatalogManager = CatalogManagerMocks.createEmptyCatalogManager()
   val moduleManager = new ModuleManager
-  private val functionCatalog = new FunctionCatalog(
-    TableConfig.getDefault,
-    catalogManager,
-    moduleManager,
-    classOf[RexNodeExtractorTest].getClassLoader)
+  private val functionCatalog =
+    new FunctionCatalog(tableConfig, resourceManager, catalogManager, moduleManager)
 
   @Test
   def testExtractRefInputFields(): Unit = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala
index 7ec6be2a06d..d02495134f0 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala
@@ -22,15 +22,12 @@ import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.types.Row
 
-import org.junit.{Assert, Before}
-
-import java.lang.AssertionError
+import org.junit.Before
 
 import scala.collection.Seq
 
 /** Batch [[FileSystemITCaseBase]]. */
 abstract class BatchFileSystemITCaseBase extends BatchTestBase with FileSystemITCaseBase {
-
   @Before
   override def before(): Unit = {
     super.before()
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
index 7947b689f8d..ad51e49412c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/PartitionableSourceITCase.scala
@@ -17,26 +17,21 @@
  */
 package org.apache.flink.table.planner.runtime.batch.sql
 
-import org.apache.flink.client.ClientUtils
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.table.api.{EnvironmentSettings, TableConfig}
-import org.apache.flink.table.api.internal.TableEnvironmentImpl
 import org.apache.flink.table.catalog.{CatalogPartitionImpl, CatalogPartitionSpec, ObjectPath}
-import org.apache.flink.table.planner.delegation.PlannerBase
 import org.apache.flink.table.planner.factories.{TestValuesCatalog, TestValuesTableFactory}
 import org.apache.flink.table.planner.runtime.utils.BatchAbstractTestBase.TEMPORARY_FOLDER
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase
 import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
 import org.apache.flink.table.planner.utils.TestingTableEnvironment
+import org.apache.flink.table.resource.{ResourceType, ResourceUri}
 import org.apache.flink.util.UserClassLoaderJarTestUtils
 
 import org.junit.{Before, Test}
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 
-import java.io.File
-import java.net.URL
 import java.util
+import java.util.Collections
 
 import scala.collection.JavaConversions._
 
@@ -44,40 +39,8 @@ import scala.collection.JavaConversions._
 class PartitionableSourceITCase(val sourceFetchPartitions: Boolean, val useCatalogFilter: Boolean)
   extends BatchTestBase {
 
-  final private val UDF_CLASS =
-    s"""
-       |public class TrimUDF extends org.apache.flink.table.functions.ScalarFunction {
-       |   public String eval(String str) {
-       |     return str.trim();
-       |   }
-       |}
-       |""".stripMargin
-
-  private def overrideTableEnv(): Unit = {
-    val tmpDir: File = TEMPORARY_FOLDER.newFolder()
-    val udfJarFile: File =
-      UserClassLoaderJarTestUtils.createJarFile(tmpDir, "flink-test-udf.jar", "TrimUDF", UDF_CLASS)
-    val jars: util.List[URL] = util.Collections.singletonList(udfJarFile.toURI.toURL)
-    val cl = ClientUtils.buildUserCodeClassLoader(
-      jars,
-      util.Collections.emptyList(),
-      getClass.getClassLoader,
-      new Configuration())
-
-    settings = EnvironmentSettings.newInstance().inBatchMode().withClassLoader(cl).build()
-    testingTableEnv = TestingTableEnvironment
-      .create(settings, catalogManager = None, TableConfig.getDefault)
-    tEnv = testingTableEnv
-    planner = tEnv.asInstanceOf[TableEnvironmentImpl].getPlanner.asInstanceOf[PlannerBase]
-    env = planner.getExecEnv
-    env.getConfig.enableObjectReuse()
-    tableConfig = tEnv.getConfig
-  }
-
   @Before
   override def before(): Unit = {
-    // override TableEnvironment by a user defined classloader
-    overrideTableEnv()
     super.before()
 
     env.setParallelism(1) // set sink parallelism to 1
@@ -217,6 +180,28 @@ class PartitionableSourceITCase(val sourceFetchPartitions: Boolean, val useCatal
 
   @Test
   def testPartitionPrunerCompileClassLoader(): Unit = {
+    val udfJavaCode =
+      s"""
+         |public class TrimUDF extends org.apache.flink.table.functions.ScalarFunction {
+         |   public String eval(String str) {
+         |     return str.trim();
+         |   }
+         |}
+         |""".stripMargin
+    val tmpDir = TEMPORARY_FOLDER.newFolder()
+    val udfJarFile =
+      UserClassLoaderJarTestUtils.createJarFile(
+        tmpDir,
+        "flink-test-udf.jar",
+        "TrimUDF",
+        udfJavaCode)
+
+    tEnv
+      .asInstanceOf[TestingTableEnvironment]
+      .getResourceManager
+      .registerJarResources(
+        Collections.singletonList(new ResourceUri(ResourceType.JAR, udfJarFile.toURI.toString)))
+
     tEnv.executeSql("create temporary function trimUDF as 'TrimUDF'")
     checkResult(
       "select * from PartitionableTable where trimUDF(part1) = 'A' and part2 > 1",
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
index 3eaca6e3809..35c21da04a6 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/BatchTestBase.scala
@@ -72,6 +72,7 @@ class BatchTestBase extends BatchAbstractTestBase {
     "(?s)From line ([0-9]+),"
       + " column ([0-9]+) to line ([0-9]+), column ([0-9]+): (.*)")
 
+  @throws(classOf[Exception])
   @Before
   def before(): Unit = {
     BatchTestBase.configForMiniCluster(tableConfig)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
index c622af7f69a..222bb656ecc 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/StreamingTestBase.scala
@@ -43,6 +43,7 @@ class StreamingTestBase extends AbstractTestBase {
   @Rule
   def tempFolder: TemporaryFolder = _tempFolder
 
+  @throws(classOf[Exception])
   @Before
   @BeforeEach
   def before(): Unit = {
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 1f99ea6e154..1b37f8dcb86 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -42,7 +42,7 @@ import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.factories.{FactoryUtil, PlannerFactoryUtil, StreamTableSourceFactory}
 import org.apache.flink.table.functions._
 import org.apache.flink.table.module.ModuleManager
-import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation, SinkModifyOperation}
+import org.apache.flink.table.operations.{ModifyOperation, QueryOperation}
 import org.apache.flink.table.planner.calcite.CalciteConfig
 import org.apache.flink.table.planner.delegation.PlannerBase
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable
@@ -57,6 +57,7 @@ import org.apache.flink.table.planner.runtime.utils.{TestingAppendTableSink, Tes
 import org.apache.flink.table.planner.sinks.CollectRowTableSink
 import org.apache.flink.table.planner.utils.PlanKind.PlanKind
 import org.apache.flink.table.planner.utils.TableTestUtil.{replaceNodeIdInOperator, replaceStageId, replaceStreamNodeId}
+import org.apache.flink.table.resource.ResourceManager
 import org.apache.flink.table.runtime.types.TypeInfoLogicalTypeConverter.fromLogicalTypeToTypeInfo
 import org.apache.flink.table.sinks._
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
@@ -64,6 +65,7 @@ import org.apache.flink.table.types.logical.LogicalType
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.typeutils.FieldInfoUtils
 import org.apache.flink.types.Row
+import org.apache.flink.util.{FlinkUserCodeClassLoaders, MutableURLClassLoader}
 
 import _root_.java.math.{BigDecimal => JBigDecimal}
 import _root_.java.util
@@ -78,6 +80,7 @@ import org.junit.Rule
 import org.junit.rules.{ExpectedException, TemporaryFolder, TestName}
 
 import java.io.{File, IOException}
+import java.net.URL
 import java.nio.file.{Files, Paths}
 import java.time.Duration
 
@@ -1431,21 +1434,23 @@ class TestTableSourceFactory extends StreamTableSourceFactory[Row] {
 class TestingTableEnvironment private (
     catalogManager: CatalogManager,
     moduleManager: ModuleManager,
+    resourceManager: ResourceManager,
     tableConfig: TableConfig,
     executor: Executor,
     functionCatalog: FunctionCatalog,
     planner: PlannerBase,
-    isStreamingMode: Boolean,
-    userClassLoader: ClassLoader)
+    isStreamingMode: Boolean)
   extends TableEnvironmentImpl(
     catalogManager,
     moduleManager,
+    resourceManager,
     tableConfig,
     executor,
     functionCatalog,
     planner,
-    isStreamingMode,
-    userClassLoader) {
+    isStreamingMode) {
+
+  def getResourceManager: ResourceManager = resourceManager
 
   // just for testing, remove this method while
   // `<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction);`
@@ -1507,10 +1512,14 @@ object TestingTableEnvironment {
       catalogManager: Option[CatalogManager] = None,
       tableConfig: TableConfig): TestingTableEnvironment = {
 
-    val classLoader = settings.getUserClassLoader
+    val userClassLoader: MutableURLClassLoader =
+      FlinkUserCodeClassLoaders.create(
+        new Array[URL](0),
+        settings.getUserClassLoader,
+        settings.getConfiguration)
 
     val executorFactory = FactoryUtil.discoverFactory(
-      classLoader,
+      userClassLoader,
       classOf[ExecutorFactory],
       ExecutorFactory.DEFAULT_IDENTIFIER)
 
@@ -1519,13 +1528,14 @@ object TestingTableEnvironment {
     tableConfig.setRootConfiguration(executor.getConfiguration)
     tableConfig.addConfiguration(settings.getConfiguration)
 
+    val resourceManager = new ResourceManager(settings.getConfiguration, userClassLoader)
     val moduleManager = new ModuleManager
 
     val catalogMgr = catalogManager match {
       case Some(c) => c
       case _ =>
         CatalogManager.newBuilder
-          .classLoader(classLoader)
+          .classLoader(userClassLoader)
           .config(tableConfig)
           .defaultCatalog(
             settings.getBuiltInCatalogName,
@@ -1536,21 +1546,27 @@ object TestingTableEnvironment {
     }
 
     val functionCatalog =
-      new FunctionCatalog(settings.getConfiguration, catalogMgr, moduleManager, classLoader)
+      new FunctionCatalog(settings.getConfiguration, resourceManager, catalogMgr, moduleManager)
 
     val planner = PlannerFactoryUtil
-      .createPlanner(executor, tableConfig, classLoader, moduleManager, catalogMgr, functionCatalog)
+      .createPlanner(
+        executor,
+        tableConfig,
+        userClassLoader,
+        moduleManager,
+        catalogMgr,
+        functionCatalog)
       .asInstanceOf[PlannerBase]
 
     new TestingTableEnvironment(
       catalogMgr,
       moduleManager,
+      resourceManager,
       tableConfig,
       executor,
       functionCatalog,
       planner,
-      settings.isStreamingMode,
-      classLoader)
+      settings.isStreamingMode)
   }
 }