You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/09/07 02:31:40 UTC

[flink] branch master updated (016fa93813d -> 481ed78bec4)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


    from 016fa93813d [hotfix][docs][release] Update the building branch in workflow
     new 523546101f0 [fixup][table-planner] Using user classloader instead of thread context classloader
     new e0257883900 [FLINK-29074][Connectors/JDBC] Fix ClassNotFound exception when using jdbc connector by add jar syntax
     new 481ed78bec4 [FLINK-29096][table] Keep backward compatibility of JdbcCatalog constructor

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 23 +++++++---
 .../flink/connector/jdbc/catalog/JdbcCatalog.java  | 41 ++++++++++++++---
 .../connector/jdbc/catalog/JdbcCatalogUtils.java   |  9 ++--
 .../flink/connector/jdbc/catalog/MySqlCatalog.java | 37 +++++++++------
 .../connector/jdbc/catalog/PostgresCatalog.java    |  3 +-
 .../jdbc/catalog/factory/JdbcCatalogFactory.java   |  1 +
 .../connector/jdbc/dialect/JdbcDialectLoader.java  |  6 +--
 .../internal/options/JdbcConnectorOptions.java     | 19 +++++++-
 .../jdbc/table/JdbcDynamicTableFactory.java        | 29 +++++++-----
 .../jdbc/table/JdbcRowDataLookupFunction.java      |  4 +-
 .../jdbc/catalog/MySqlCatalogTestBase.java         |  1 +
 .../jdbc/catalog/PostgresCatalogTestBase.java      |  1 +
 .../catalog/factory/JdbcCatalogFactoryTest.java    |  1 +
 .../oracle/OraclePreparedStatementTest.java        |  4 +-
 .../FieldNamedPreparedStatementImplTest.java       |  3 +-
 .../jdbc/table/JdbcAppendOnlyWriterTest.java       |  5 ++
 .../exec/batch/BatchExecPythonGroupAggregate.java  |  9 ++--
 .../batch/BatchExecPythonGroupWindowAggregate.java |  8 ++--
 .../exec/batch/BatchExecPythonOverAggregate.java   |  8 ++--
 .../nodes/exec/common/CommonExecPythonCalc.java    | 30 ++++++++----
 .../exec/common/CommonExecPythonCorrelate.java     | 24 ++++++----
 .../stream/StreamExecPythonGroupAggregate.java     | 11 +++--
 .../StreamExecPythonGroupTableAggregate.java       | 12 +++--
 .../StreamExecPythonGroupWindowAggregate.java      | 16 +++++--
 .../exec/stream/StreamExecPythonOverAggregate.java | 10 ++--
 .../plan/nodes/exec/utils/CommonPythonUtil.java    | 53 ++++++++++++++--------
 .../physical/common/CommonPhysicalMatchRule.java   |  3 +-
 .../table/planner/delegation/PlannerBase.scala     |  2 +-
 .../physical/batch/BatchPhysicalSortRule.scala     |  1 -
 29 files changed, 259 insertions(+), 115 deletions(-)


[flink] 03/03: [FLINK-29096][table] Keep backward compatibility of JdbcCatalog constructor

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 481ed78bec4211561e78be7586a102bd37a4dfb1
Author: Jark Wu <ja...@apache.org>
AuthorDate: Tue Sep 6 19:50:02 2022 +0800

    [FLINK-29096][table] Keep backward compatibility of JdbcCatalog constructor
---
 .../flink/connector/jdbc/catalog/JdbcCatalog.java  | 36 +++++++++++++++++++---
 1 file changed, 31 insertions(+), 5 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java
index 0a9317154cc..fd8a6b5de6c 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java
@@ -27,19 +27,45 @@ import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.List;
 
 /** Catalogs for relational databases via JDBC. */
 @PublicEvolving
 public class JdbcCatalog extends AbstractJdbcCatalog {
 
-    private static final Logger LOG = LoggerFactory.getLogger(JdbcCatalog.class);
-
     private final AbstractJdbcCatalog internal;
 
+    /**
+     * Creates a JdbcCatalog.
+     *
+     * @deprecated please use {@link JdbcCatalog#JdbcCatalog(ClassLoader, String, String, String,
+     *     String, String)} instead.
+     */
+    public JdbcCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        this(
+                Thread.currentThread().getContextClassLoader(),
+                catalogName,
+                defaultDatabase,
+                username,
+                pwd,
+                baseUrl);
+    }
+
+    /**
+     * Creates a JdbcCatalog.
+     *
+     * @param userClassLoader the classloader used to load JDBC driver
+     * @param catalogName the registered catalog name
+     * @param defaultDatabase the default database name
+     * @param username the username used to connect the database
+     * @param pwd the password used to connect the database
+     * @param baseUrl the base URL of the database, e.g. jdbc:mysql://localhost:3306
+     */
     public JdbcCatalog(
             ClassLoader userClassLoader,
             String catalogName,


[flink] 02/03: [FLINK-29074][Connectors/JDBC] Fix ClassNotFound exception when using jdbc connector by add jar syntax

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e0257883900b106de433836e1e553772eed43e6c
Author: fengli <ld...@163.com>
AuthorDate: Mon Aug 29 20:18:20 2022 +0800

    [FLINK-29074][Connectors/JDBC] Fix ClassNotFound exception when using jdbc connector by add jar syntax
    
    This closes #20707
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 23 ++++++++++----
 .../flink/connector/jdbc/catalog/JdbcCatalog.java  |  5 +--
 .../connector/jdbc/catalog/JdbcCatalogUtils.java   |  9 ++++--
 .../flink/connector/jdbc/catalog/MySqlCatalog.java | 37 ++++++++++++++--------
 .../connector/jdbc/catalog/PostgresCatalog.java    |  3 +-
 .../jdbc/catalog/factory/JdbcCatalogFactory.java   |  1 +
 .../connector/jdbc/dialect/JdbcDialectLoader.java  |  6 ++--
 .../internal/options/JdbcConnectorOptions.java     | 19 ++++++++++-
 .../jdbc/table/JdbcDynamicTableFactory.java        | 29 ++++++++++-------
 .../jdbc/table/JdbcRowDataLookupFunction.java      |  4 +--
 .../jdbc/catalog/MySqlCatalogTestBase.java         |  1 +
 .../jdbc/catalog/PostgresCatalogTestBase.java      |  1 +
 .../catalog/factory/JdbcCatalogFactoryTest.java    |  1 +
 .../oracle/OraclePreparedStatementTest.java        |  4 ++-
 .../FieldNamedPreparedStatementImplTest.java       |  3 +-
 .../jdbc/table/JdbcAppendOnlyWriterTest.java       |  5 +++
 16 files changed, 103 insertions(+), 48 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
index 82410850420..482ce79398e 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
@@ -51,6 +51,7 @@ import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import org.apache.commons.compress.utils.Lists;
 import org.slf4j.Logger;
@@ -79,18 +80,21 @@ import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAM
 import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
 import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
 import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Abstract catalog for any JDBC catalogs. */
 public abstract class AbstractJdbcCatalog extends AbstractCatalog {
 
     private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcCatalog.class);
 
+    protected final ClassLoader userClassLoader;
     protected final String username;
     protected final String pwd;
     protected final String baseUrl;
     protected final String defaultUrl;
 
     public AbstractJdbcCatalog(
+            ClassLoader userClassLoader,
             String catalogName,
             String defaultDatabase,
             String username,
@@ -98,12 +102,14 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
             String baseUrl) {
         super(catalogName, defaultDatabase);
 
+        checkNotNull(userClassLoader);
         checkArgument(!StringUtils.isNullOrWhitespaceOnly(username));
         checkArgument(!StringUtils.isNullOrWhitespaceOnly(pwd));
         checkArgument(!StringUtils.isNullOrWhitespaceOnly(baseUrl));
 
         JdbcCatalogUtils.validateJdbcUrl(baseUrl);
 
+        this.userClassLoader = userClassLoader;
         this.username = username;
         this.pwd = pwd;
         this.baseUrl = baseUrl.endsWith("/") ? baseUrl : baseUrl + "/";
@@ -112,14 +118,17 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
 
     @Override
     public void open() throws CatalogException {
-        // test connection, fail early if we cannot connect to database
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
-        } catch (SQLException e) {
-            throw new ValidationException(
-                    String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
+        // load the Driver use userClassLoader explicitly, see FLINK-15635 for more detail
+        try (TemporaryClassLoaderContext ignored =
+                TemporaryClassLoaderContext.of(userClassLoader)) {
+            // test connection, fail early if we cannot connect to database
+            try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+            } catch (SQLException e) {
+                throw new ValidationException(
+                        String.format("Failed connecting to %s via JDBC.", defaultUrl), e);
+            }
+            LOG.info("Catalog {} established connection to {}", getName(), defaultUrl);
         }
-
-        LOG.info("Catalog {} established connection to {}", getName(), defaultUrl);
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java
index 1592ef4a120..0a9317154cc 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java
@@ -41,16 +41,17 @@ public class JdbcCatalog extends AbstractJdbcCatalog {
     private final AbstractJdbcCatalog internal;
 
     public JdbcCatalog(
+            ClassLoader userClassLoader,
             String catalogName,
             String defaultDatabase,
             String username,
             String pwd,
             String baseUrl) {
-        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
 
         internal =
                 JdbcCatalogUtils.createCatalog(
-                        catalogName, defaultDatabase, username, pwd, baseUrl);
+                        userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
     }
 
     // ------ databases -----
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
index fa1af795d5b..28bea80595b 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
@@ -39,17 +39,20 @@ public class JdbcCatalogUtils {
 
     /** Create catalog instance from given information. */
     public static AbstractJdbcCatalog createCatalog(
+            ClassLoader userClassLoader,
             String catalogName,
             String defaultDatabase,
             String username,
             String pwd,
             String baseUrl) {
-        JdbcDialect dialect = JdbcDialectLoader.load(baseUrl);
+        JdbcDialect dialect = JdbcDialectLoader.load(baseUrl, userClassLoader);
 
         if (dialect instanceof PostgresDialect) {
-            return new PostgresCatalog(catalogName, defaultDatabase, username, pwd, baseUrl);
+            return new PostgresCatalog(
+                    userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
         } else if (dialect instanceof MySqlDialect) {
-            return new MySqlCatalog(catalogName, defaultDatabase, username, pwd, baseUrl);
+            return new MySqlCatalog(
+                    userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
         } else {
             throw new UnsupportedOperationException(
                     String.format("Catalog for '%s' is not supported yet.", dialect));
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java
index de3dfe200b4..d88b2288690 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TemporaryClassLoaderContext;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -60,12 +61,13 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
             };
 
     public MySqlCatalog(
+            ClassLoader userClassLoader,
             String catalogName,
             String defaultDatabase,
             String username,
             String pwd,
             String baseUrl) {
-        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
 
         String driverVersion =
                 Preconditions.checkNotNull(getDriverVersion(), "Driver version must not be null.");
@@ -118,23 +120,30 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
     }
 
     private String getDatabaseVersion() {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
-            return conn.getMetaData().getDatabaseProductVersion();
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed in getting MySQL version by %s.", defaultUrl), e);
+        try (TemporaryClassLoaderContext ignored =
+                TemporaryClassLoaderContext.of(userClassLoader)) {
+            try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+                return conn.getMetaData().getDatabaseProductVersion();
+            } catch (Exception e) {
+                throw new CatalogException(
+                        String.format("Failed in getting MySQL version by %s.", defaultUrl), e);
+            }
         }
     }
 
     private String getDriverVersion() {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
-            String driverVersion = conn.getMetaData().getDriverVersion();
-            Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
-            Matcher matcher = regexp.matcher(driverVersion);
-            return matcher.find() ? matcher.group(0) : null;
-        } catch (Exception e) {
-            throw new CatalogException(
-                    String.format("Failed in getting MySQL driver version by %s.", defaultUrl), e);
+        try (TemporaryClassLoaderContext ignored =
+                TemporaryClassLoaderContext.of(userClassLoader)) {
+            try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+                String driverVersion = conn.getMetaData().getDriverVersion();
+                Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
+                Matcher matcher = regexp.matcher(driverVersion);
+                return matcher.find() ? matcher.group(0) : null;
+            } catch (Exception e) {
+                throw new CatalogException(
+                        String.format("Failed in getting MySQL driver version by %s.", defaultUrl),
+                        e);
+            }
         }
     }
 
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
index 5d767e540cd..de41c01a1b1 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java
@@ -71,12 +71,13 @@ public class PostgresCatalog extends AbstractJdbcCatalog {
     private final JdbcDialectTypeMapper dialectTypeMapper;
 
     protected PostgresCatalog(
+            ClassLoader userClassLoader,
             String catalogName,
             String defaultDatabase,
             String username,
             String pwd,
             String baseUrl) {
-        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        super(userClassLoader, catalogName, defaultDatabase, username, pwd, baseUrl);
         this.dialectTypeMapper = new PostgresTypeMapper();
     }
 
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
index 2fb74f24986..9677744d2f4 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactory.java
@@ -70,6 +70,7 @@ public class JdbcCatalogFactory implements CatalogFactory {
         helper.validate();
 
         return new JdbcCatalog(
+                context.getClassLoader(),
                 context.getName(),
                 helper.getOptions().get(DEFAULT_DATABASE),
                 helper.getOptions().get(USERNAME),
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java
index fa9253cf149..6dfad9b0e47 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/JdbcDialectLoader.java
@@ -41,13 +41,13 @@ public final class JdbcDialectLoader {
      * Loads the unique JDBC Dialect that can handle the given database url.
      *
      * @param url A database URL.
+     * @param classLoader the classloader used to load the factory
      * @throws IllegalStateException if the loader cannot find exactly one dialect that can
      *     unambiguously process the given database URL.
      * @return The loaded dialect.
      */
-    public static JdbcDialect load(String url) {
-        ClassLoader cl = Thread.currentThread().getContextClassLoader();
-        List<JdbcDialectFactory> foundFactories = discoverFactories(cl);
+    public static JdbcDialect load(String url, ClassLoader classLoader) {
+        List<JdbcDialectFactory> foundFactories = discoverFactories(classLoader);
 
         if (foundFactories.isEmpty()) {
             throw new IllegalStateException(
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java
index 89d22bd3067..0b2562ab14e 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/options/JdbcConnectorOptions.java
@@ -105,6 +105,7 @@ public class JdbcConnectorOptions extends JdbcConnectionOptions {
 
     /** Builder of {@link JdbcConnectorOptions}. */
     public static class Builder {
+        private ClassLoader classLoader;
         private String dbURL;
         private String tableName;
         private String driverName;
@@ -114,6 +115,19 @@ public class JdbcConnectorOptions extends JdbcConnectionOptions {
         private Integer parallelism;
         private int connectionCheckTimeoutSeconds = 60;
 
+        /**
+         * optional, specifies the classloader to use in the planner for load the class in user jar.
+         *
+         * <p>By default, this is configured using {@code
+         * Thread.currentThread().getContextClassLoader()}.
+         *
+         * <p>Modify the {@link ClassLoader} only if you know what you're doing.
+         */
+        public Builder setClassLoader(ClassLoader classLoader) {
+            this.classLoader = classLoader;
+            return this;
+        }
+
         /** required, table name. */
         public Builder setTableName(String tableName) {
             this.tableName = tableName;
@@ -171,7 +185,10 @@ public class JdbcConnectorOptions extends JdbcConnectionOptions {
             checkNotNull(dbURL, "No dbURL supplied.");
             checkNotNull(tableName, "No tableName supplied.");
             if (this.dialect == null) {
-                this.dialect = JdbcDialectLoader.load(dbURL);
+                if (classLoader == null) {
+                    classLoader = Thread.currentThread().getContextClassLoader();
+                }
+                this.dialect = JdbcDialectLoader.load(dbURL, classLoader);
             }
             if (this.driverName == null) {
                 Optional<String> optional = dialect.defaultDriverName();
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
index 5760686ebaa..bff5a498a51 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableFactory.java
@@ -87,9 +87,10 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
         final ReadableConfig config = helper.getOptions();
 
         helper.validate();
-        validateConfigOptions(config);
-        validateDataTypeWithJdbcDialect(context.getPhysicalRowDataType(), config.get(URL));
-        JdbcConnectorOptions jdbcOptions = getJdbcOptions(config);
+        validateConfigOptions(config, context.getClassLoader());
+        validateDataTypeWithJdbcDialect(
+                context.getPhysicalRowDataType(), config.get(URL), context.getClassLoader());
+        JdbcConnectorOptions jdbcOptions = getJdbcOptions(config, context.getClassLoader());
 
         return new JdbcDynamicTableSink(
                 jdbcOptions,
@@ -108,28 +109,32 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
         final ReadableConfig config = helper.getOptions();
 
         helper.validate();
-        validateConfigOptions(config);
-        validateDataTypeWithJdbcDialect(context.getPhysicalRowDataType(), config.get(URL));
+        validateConfigOptions(config, context.getClassLoader());
+        validateDataTypeWithJdbcDialect(
+                context.getPhysicalRowDataType(), config.get(URL), context.getClassLoader());
         return new JdbcDynamicTableSource(
-                getJdbcOptions(helper.getOptions()),
+                getJdbcOptions(helper.getOptions(), context.getClassLoader()),
                 getJdbcReadOptions(helper.getOptions()),
                 helper.getOptions().get(LookupOptions.MAX_RETRIES),
                 getLookupCache(config),
                 context.getPhysicalRowDataType());
     }
 
-    private static void validateDataTypeWithJdbcDialect(DataType dataType, String url) {
-        final JdbcDialect dialect = JdbcDialectLoader.load(url);
+    private static void validateDataTypeWithJdbcDialect(
+            DataType dataType, String url, ClassLoader classLoader) {
+        final JdbcDialect dialect = JdbcDialectLoader.load(url, classLoader);
         dialect.validate((RowType) dataType.getLogicalType());
     }
 
-    private JdbcConnectorOptions getJdbcOptions(ReadableConfig readableConfig) {
+    private JdbcConnectorOptions getJdbcOptions(
+            ReadableConfig readableConfig, ClassLoader classLoader) {
         final String url = readableConfig.get(URL);
         final JdbcConnectorOptions.Builder builder =
                 JdbcConnectorOptions.builder()
+                        .setClassLoader(classLoader)
                         .setDBUrl(url)
                         .setTableName(readableConfig.get(TABLE_NAME))
-                        .setDialect(JdbcDialectLoader.load(url))
+                        .setDialect(JdbcDialectLoader.load(url, classLoader))
                         .setParallelism(readableConfig.getOptional(SINK_PARALLELISM).orElse(null))
                         .setConnectionCheckTimeoutSeconds(
                                 (int) readableConfig.get(MAX_RETRY_TIMEOUT).getSeconds());
@@ -260,9 +265,9 @@ public class JdbcDynamicTableFactory implements DynamicTableSourceFactory, Dynam
                 .collect(Collectors.toSet());
     }
 
-    private void validateConfigOptions(ReadableConfig config) {
+    private void validateConfigOptions(ReadableConfig config, ClassLoader classLoader) {
         String jdbcUrl = config.get(URL);
-        JdbcDialectLoader.load(jdbcUrl);
+        JdbcDialectLoader.load(jdbcUrl, classLoader);
 
         checkAllOrNone(config, new ConfigOption[] {USERNAME, PASSWORD});
 
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
index 6f84965a8ca..cb344b2b865 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcRowDataLookupFunction.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
 import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
-import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
 import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
 import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
 import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
@@ -96,8 +95,7 @@ public class JdbcRowDataLookupFunction extends LookupFunction {
         this.query =
                 options.getDialect()
                         .getSelectFromStatement(options.getTableName(), fieldNames, keyNames);
-        String dbURL = options.getDbURL();
-        JdbcDialect jdbcDialect = JdbcDialectLoader.load(dbURL);
+        JdbcDialect jdbcDialect = options.getDialect();
         this.jdbcRowConverter = jdbcDialect.getRowConverter(rowType);
         this.lookupKeyRowConverter =
                 jdbcDialect.getRowConverter(
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
index c06d0d6c266..d92c89ef159 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
@@ -129,6 +129,7 @@ public class MySqlCatalogTestBase {
             CATALOGS.put(
                     dockerImageName,
                     new MySqlCatalog(
+                            Thread.currentThread().getContextClassLoader(),
                             TEST_CATALOG_NAME,
                             TEST_DB,
                             TEST_USERNAME,
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
index 4b94ae5024e..73b56a3a1d7 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalogTestBase.java
@@ -82,6 +82,7 @@ public class PostgresCatalogTestBase {
 
         catalog =
                 new PostgresCatalog(
+                        Thread.currentThread().getContextClassLoader(),
                         TEST_CATALOG_NAME,
                         PostgresCatalog.DEFAULT_DATABASE,
                         TEST_USERNAME,
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
index 2da31122747..1527a078574 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/factory/JdbcCatalogFactoryTest.java
@@ -71,6 +71,7 @@ public class JdbcCatalogFactoryTest {
 
         catalog =
                 new JdbcCatalog(
+                        Thread.currentThread().getContextClassLoader(),
                         TEST_CATALOG_NAME,
                         PostgresCatalog.DEFAULT_DATABASE,
                         TEST_USERNAME,
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java
index cc7611470d9..b4827b6f635 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/oracle/OraclePreparedStatementTest.java
@@ -35,7 +35,9 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link OraclePreparedStatementTest}. */
 public class OraclePreparedStatementTest {
 
-    private final JdbcDialect dialect = JdbcDialectLoader.load("jdbc:oracle://localhost:3306/test");
+    private final JdbcDialect dialect =
+            JdbcDialectLoader.load(
+                    "jdbc:oracle://localhost:3306/test", getClass().getClassLoader());
     private final String[] fieldNames =
             new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"};
     private final String[] keyFields = new String[] {"id", "__field_3__"};
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
index 6013df6ecb9..013c711c28d 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/statement/FieldNamedPreparedStatementImplTest.java
@@ -34,7 +34,8 @@ import static org.assertj.core.api.Assertions.assertThat;
 /** Tests for {@link FieldNamedPreparedStatementImpl}. */
 public class FieldNamedPreparedStatementImplTest {
 
-    private final JdbcDialect dialect = JdbcDialectLoader.load("jdbc:mysql://localhost:3306/test");
+    private final JdbcDialect dialect =
+            JdbcDialectLoader.load("jdbc:mysql://localhost:3306/test", getClass().getClassLoader());
     private final String[] fieldNames =
             new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"};
     private final String[] keyFields = new String[] {"id", "__field_3__"};
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
index 2617832bbc8..66b0cc41296 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/table/JdbcAppendOnlyWriterTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.connector.jdbc.DbMetadata;
 import org.apache.flink.connector.jdbc.JdbcTestBase;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
 import org.apache.flink.connector.jdbc.internal.JdbcOutputFormat;
 import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
 
@@ -61,6 +62,10 @@ public class JdbcAppendOnlyWriterTest extends JdbcTestBase {
                         .setOptions(
                                 JdbcConnectorOptions.builder()
                                         .setDBUrl(getDbMetadata().getUrl())
+                                        .setDialect(
+                                                JdbcDialectLoader.load(
+                                                        getDbMetadata().getUrl(),
+                                                        getClass().getClassLoader()))
                                         .setTableName(OUTPUT_TABLE)
                                         .build())
                         .setFieldNames(fieldNames)


[flink] 01/03: [fixup][table-planner] Using user classloader instead of thread context classloader

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 523546101f0180999f11d68269aad53c59134064
Author: fengli <ld...@163.com>
AuthorDate: Mon Aug 29 20:10:53 2022 +0800

    [fixup][table-planner] Using user classloader instead of thread context classloader
---
 .../exec/batch/BatchExecPythonGroupAggregate.java  |  9 ++--
 .../batch/BatchExecPythonGroupWindowAggregate.java |  8 ++--
 .../exec/batch/BatchExecPythonOverAggregate.java   |  8 ++--
 .../nodes/exec/common/CommonExecPythonCalc.java    | 30 ++++++++----
 .../exec/common/CommonExecPythonCorrelate.java     | 24 ++++++----
 .../stream/StreamExecPythonGroupAggregate.java     | 11 +++--
 .../StreamExecPythonGroupTableAggregate.java       | 12 +++--
 .../StreamExecPythonGroupWindowAggregate.java      | 16 +++++--
 .../exec/stream/StreamExecPythonOverAggregate.java | 10 ++--
 .../plan/nodes/exec/utils/CommonPythonUtil.java    | 53 ++++++++++++++--------
 .../physical/common/CommonPhysicalMatchRule.java   |  3 +-
 .../table/planner/delegation/PlannerBase.scala     |  2 +-
 .../physical/batch/BatchPhysicalSortRule.scala     |  1 -
 13 files changed, 125 insertions(+), 62 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java
index dbb6033c364..98e2ca2551d 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupAggregate.java
@@ -94,7 +94,8 @@ public class BatchExecPythonGroupAggregate extends ExecNodeBase<RowData>
         final RowType inputRowType = (RowType) inputEdge.getOutputType();
         final RowType outputRowType = InternalTypeInfo.of(getOutputType()).toRowType();
         Configuration pythonConfig =
-                CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config);
+                CommonPythonUtil.extractPythonConfiguration(
+                        planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader());
         OneInputTransformation<RowData, RowData> transform =
                 createPythonOneInputTransformation(
                         inputTransform,
@@ -104,7 +105,8 @@ public class BatchExecPythonGroupAggregate extends ExecNodeBase<RowData>
                         config,
                         planner.getFlinkContext().getClassLoader());
 
-        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(pythonConfig)) {
+        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(
+                pythonConfig, planner.getFlinkContext().getClassLoader())) {
             transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
         }
         return transform;
@@ -149,7 +151,8 @@ public class BatchExecPythonGroupAggregate extends ExecNodeBase<RowData>
             int[] udafInputOffsets,
             PythonFunctionInfo[] pythonFunctionInfos) {
         final Class<?> clazz =
-                CommonPythonUtil.loadClass(ARROW_PYTHON_AGGREGATE_FUNCTION_OPERATOR_NAME);
+                CommonPythonUtil.loadClass(
+                        ARROW_PYTHON_AGGREGATE_FUNCTION_OPERATOR_NAME, classLoader);
 
         RowType udfInputType = (RowType) Projection.of(udafInputOffsets).project(inputRowType);
         RowType udfOutputType =
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java
index ae8a9c2ad02..930a2f7fe59 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonGroupWindowAggregate.java
@@ -114,7 +114,8 @@ public class BatchExecPythonGroupWindowAggregate extends ExecNodeBase<RowData>
 
         final Tuple2<Long, Long> windowSizeAndSlideSize = WindowCodeGenerator.getWindowDef(window);
         final Configuration pythonConfig =
-                CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config);
+                CommonPythonUtil.extractPythonConfiguration(
+                        planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader());
         int groupBufferLimitSize =
                 pythonConfig.getInteger(
                         ExecutionConfigOptions.TABLE_EXEC_WINDOW_AGG_BUFFER_SIZE_LIMIT);
@@ -130,7 +131,8 @@ public class BatchExecPythonGroupWindowAggregate extends ExecNodeBase<RowData>
                         pythonConfig,
                         config,
                         planner.getFlinkContext().getClassLoader());
-        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(pythonConfig)) {
+        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(
+                pythonConfig, planner.getFlinkContext().getClassLoader())) {
             transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
         }
         return transform;
@@ -204,7 +206,7 @@ public class BatchExecPythonGroupWindowAggregate extends ExecNodeBase<RowData>
             PythonFunctionInfo[] pythonFunctionInfos) {
         Class<?> clazz =
                 CommonPythonUtil.loadClass(
-                        ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME);
+                        ARROW_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME, classLoader);
 
         RowType udfInputType = (RowType) Projection.of(udafInputOffsets).project(inputRowType);
         RowType udfOutputType =
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java
index 5023931259f..9f4717aa5ef 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecPythonOverAggregate.java
@@ -153,7 +153,8 @@ public class BatchExecPythonOverAggregate extends BatchExecOverAggregateBase {
             }
         }
         Configuration pythonConfig =
-                CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config);
+                CommonPythonUtil.extractPythonConfiguration(
+                        planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader());
         OneInputTransformation<RowData, RowData> transform =
                 createPythonOneInputTransformation(
                         inputTransform,
@@ -163,7 +164,8 @@ public class BatchExecPythonOverAggregate extends BatchExecOverAggregateBase {
                         pythonConfig,
                         config,
                         planner.getFlinkContext().getClassLoader());
-        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(pythonConfig)) {
+        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(
+                pythonConfig, planner.getFlinkContext().getClassLoader())) {
             transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
         }
         return transform;
@@ -213,7 +215,7 @@ public class BatchExecPythonOverAggregate extends BatchExecOverAggregateBase {
             PythonFunctionInfo[] pythonFunctionInfos) {
         Class<?> clazz =
                 CommonPythonUtil.loadClass(
-                        ARROW_PYTHON_OVER_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME);
+                        ARROW_PYTHON_OVER_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME, classLoader);
 
         RowType udfInputType = (RowType) Projection.of(udafInputOffsets).project(inputRowType);
         RowType udfOutputType =
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
index e102de9d063..d0249791edd 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCalc.java
@@ -108,14 +108,16 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
         final Configuration pythonConfig =
-                CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config);
+                CommonPythonUtil.extractPythonConfiguration(
+                        planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader());
         OneInputTransformation<RowData, RowData> ret =
                 createPythonOneInputTransformation(
                         inputTransform,
                         config,
                         planner.getFlinkContext().getClassLoader(),
                         pythonConfig);
-        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(pythonConfig)) {
+        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(
+                pythonConfig, planner.getFlinkContext().getClassLoader())) {
             ret.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
         }
         return ret;
@@ -139,7 +141,7 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
                         .collect(Collectors.toList());
 
         Tuple2<int[], PythonFunctionInfo[]> extractResult =
-                extractPythonScalarFunctionInfos(pythonRexCalls);
+                extractPythonScalarFunctionInfos(pythonRexCalls, classLoader);
         int[] pythonUdfInputOffsets = extractResult.f0;
         PythonFunctionInfo[] pythonFunctionInfos = extractResult.f1;
         LogicalType[] inputLogicalTypes =
@@ -185,11 +187,14 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
     }
 
     private Tuple2<int[], PythonFunctionInfo[]> extractPythonScalarFunctionInfos(
-            List<RexCall> rexCalls) {
+            List<RexCall> rexCalls, ClassLoader classLoader) {
         LinkedHashMap<RexNode, Integer> inputNodes = new LinkedHashMap<>();
         PythonFunctionInfo[] pythonFunctionInfos =
                 rexCalls.stream()
-                        .map(x -> CommonPythonUtil.createPythonFunctionInfo(x, inputNodes))
+                        .map(
+                                x ->
+                                        CommonPythonUtil.createPythonFunctionInfo(
+                                                x, inputNodes, classLoader))
                         .collect(Collectors.toList())
                         .toArray(new PythonFunctionInfo[rexCalls.size()]);
 
@@ -221,14 +226,21 @@ public abstract class CommonExecPythonCalc extends ExecNodeBase<RowData>
             int[] forwardedFields,
             boolean isArrow) {
         Class<?> clazz;
-        boolean isInProcessMode = CommonPythonUtil.isPythonWorkerInProcessMode(pythonConfig);
+        boolean isInProcessMode =
+                CommonPythonUtil.isPythonWorkerInProcessMode(pythonConfig, classLoader);
         if (isArrow) {
-            clazz = CommonPythonUtil.loadClass(ARROW_PYTHON_SCALAR_FUNCTION_OPERATOR_NAME);
+            clazz =
+                    CommonPythonUtil.loadClass(
+                            ARROW_PYTHON_SCALAR_FUNCTION_OPERATOR_NAME, classLoader);
         } else {
             if (isInProcessMode) {
-                clazz = CommonPythonUtil.loadClass(PYTHON_SCALAR_FUNCTION_OPERATOR_NAME);
+                clazz =
+                        CommonPythonUtil.loadClass(
+                                PYTHON_SCALAR_FUNCTION_OPERATOR_NAME, classLoader);
             } else {
-                clazz = CommonPythonUtil.loadClass(EMBEDDED_PYTHON_SCALAR_FUNCTION_OPERATOR_NAME);
+                clazz =
+                        CommonPythonUtil.loadClass(
+                                EMBEDDED_PYTHON_SCALAR_FUNCTION_OPERATOR_NAME, classLoader);
             }
         }
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
index 8661fd9b5b6..81940866104 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecPythonCorrelate.java
@@ -102,7 +102,8 @@ public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData>
         final Transformation<RowData> inputTransform =
                 (Transformation<RowData>) inputEdge.translateToPlan(planner);
         final Configuration pythonConfig =
-                CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config);
+                CommonPythonUtil.extractPythonConfiguration(
+                        planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader());
         final ExecNodeConfig pythonNodeConfig =
                 ExecNodeConfig.ofNodeConfig(pythonConfig, config.isCompiled());
         final OneInputTransformation<RowData, RowData> transform =
@@ -111,7 +112,8 @@ public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData>
                         pythonNodeConfig,
                         planner.getFlinkContext().getClassLoader(),
                         pythonConfig);
-        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(pythonConfig)) {
+        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(
+                pythonConfig, planner.getFlinkContext().getClassLoader())) {
             transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
         }
         return transform;
@@ -122,7 +124,8 @@ public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData>
             ExecNodeConfig pythonNodeConfig,
             ClassLoader classLoader,
             Configuration pythonConfig) {
-        Tuple2<int[], PythonFunctionInfo> extractResult = extractPythonTableFunctionInfo();
+        Tuple2<int[], PythonFunctionInfo> extractResult =
+                extractPythonTableFunctionInfo(classLoader);
         int[] pythonUdtfInputOffsets = extractResult.f0;
         PythonFunctionInfo pythonFunctionInfo = extractResult.f1;
         InternalTypeInfo<RowData> pythonOperatorInputRowType =
@@ -146,10 +149,11 @@ public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData>
                 inputTransform.getParallelism());
     }
 
-    private Tuple2<int[], PythonFunctionInfo> extractPythonTableFunctionInfo() {
+    private Tuple2<int[], PythonFunctionInfo> extractPythonTableFunctionInfo(
+            ClassLoader classLoader) {
         LinkedHashMap<RexNode, Integer> inputNodes = new LinkedHashMap<>();
         PythonFunctionInfo pythonTableFunctionInfo =
-                CommonPythonUtil.createPythonFunctionInfo(invocation, inputNodes);
+                CommonPythonUtil.createPythonFunctionInfo(invocation, inputNodes, classLoader);
         int[] udtfInputOffsets =
                 inputNodes.keySet().stream()
                         .filter(x -> x instanceof RexInputRef)
@@ -168,7 +172,8 @@ public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData>
             InternalTypeInfo<RowData> outputRowType,
             PythonFunctionInfo pythonFunctionInfo,
             int[] udtfInputOffsets) {
-        boolean isInProcessMode = CommonPythonUtil.isPythonWorkerInProcessMode(pythonConfig);
+        boolean isInProcessMode =
+                CommonPythonUtil.isPythonWorkerInProcessMode(pythonConfig, classLoader);
 
         final RowType inputType = inputRowType.toRowType();
         final RowType outputType = outputRowType.toRowType();
@@ -180,7 +185,9 @@ public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData>
 
         try {
             if (isInProcessMode) {
-                Class clazz = CommonPythonUtil.loadClass(PYTHON_TABLE_FUNCTION_OPERATOR_NAME);
+                Class clazz =
+                        CommonPythonUtil.loadClass(
+                                PYTHON_TABLE_FUNCTION_OPERATOR_NAME, classLoader);
                 Constructor ctor =
                         clazz.getConstructor(
                                 Configuration.class,
@@ -206,7 +213,8 @@ public abstract class CommonExecPythonCorrelate extends ExecNodeBase<RowData>
                                         udtfInputOffsets));
             } else {
                 Class clazz =
-                        CommonPythonUtil.loadClass(EMBEDDED_PYTHON_TABLE_FUNCTION_OPERATOR_NAME);
+                        CommonPythonUtil.loadClass(
+                                EMBEDDED_PYTHON_TABLE_FUNCTION_OPERATOR_NAME, classLoader);
                 Constructor ctor =
                         clazz.getConstructor(
                                 Configuration.class,
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java
index 55a8a8cd3d5..4595191332b 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupAggregate.java
@@ -175,10 +175,12 @@ public class StreamExecPythonGroupAggregate extends StreamExecAggregateBase {
         PythonAggregateFunctionInfo[] pythonFunctionInfos = aggInfosAndDataViewSpecs.f0;
         DataViewSpec[][] dataViewSpecs = aggInfosAndDataViewSpecs.f1;
         Configuration pythonConfig =
-                CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config);
+                CommonPythonUtil.extractPythonConfiguration(
+                        planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader());
         final OneInputStreamOperator<RowData, RowData> operator =
                 getPythonAggregateFunctionOperator(
                         pythonConfig,
+                        planner.getFlinkContext().getClassLoader(),
                         inputRowType,
                         InternalTypeInfo.of(getOutputType()).toRowType(),
                         pythonFunctionInfos,
@@ -196,7 +198,8 @@ public class StreamExecPythonGroupAggregate extends StreamExecAggregateBase {
                         InternalTypeInfo.of(getOutputType()),
                         inputTransform.getParallelism());
 
-        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(pythonConfig)) {
+        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(
+                pythonConfig, planner.getFlinkContext().getClassLoader())) {
             transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
         }
 
@@ -214,6 +217,7 @@ public class StreamExecPythonGroupAggregate extends StreamExecAggregateBase {
     @SuppressWarnings("unchecked")
     private OneInputStreamOperator<RowData, RowData> getPythonAggregateFunctionOperator(
             Configuration config,
+            ClassLoader classLoader,
             RowType inputType,
             RowType outputType,
             PythonAggregateFunctionInfo[] aggregateFunctions,
@@ -222,7 +226,8 @@ public class StreamExecPythonGroupAggregate extends StreamExecAggregateBase {
             long maxIdleStateRetentionTime,
             int indexOfCountStar,
             boolean countStarInserted) {
-        Class<?> clazz = CommonPythonUtil.loadClass(PYTHON_STREAM_AGGREAGTE_OPERATOR_NAME);
+        Class<?> clazz =
+                CommonPythonUtil.loadClass(PYTHON_STREAM_AGGREAGTE_OPERATOR_NAME, classLoader);
         try {
             Constructor<?> ctor =
                     clazz.getConstructor(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java
index 179b302941a..3d05d273def 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupTableAggregate.java
@@ -131,10 +131,12 @@ public class StreamExecPythonGroupTableAggregate extends ExecNodeBase<RowData>
         PythonAggregateFunctionInfo[] pythonFunctionInfos = aggInfosAndDataViewSpecs.f0;
         DataViewSpec[][] dataViewSpecs = aggInfosAndDataViewSpecs.f1;
         Configuration pythonConfig =
-                CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config);
+                CommonPythonUtil.extractPythonConfiguration(
+                        planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader());
         OneInputStreamOperator<RowData, RowData> pythonOperator =
                 getPythonTableAggregateFunctionOperator(
                         pythonConfig,
+                        planner.getFlinkContext().getClassLoader(),
                         inputRowType,
                         InternalTypeInfo.of(getOutputType()).toRowType(),
                         pythonFunctionInfos,
@@ -153,7 +155,8 @@ public class StreamExecPythonGroupTableAggregate extends ExecNodeBase<RowData>
                         InternalTypeInfo.of(getOutputType()),
                         inputTransform.getParallelism());
 
-        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(pythonConfig)) {
+        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(
+                pythonConfig, planner.getFlinkContext().getClassLoader())) {
             transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
         }
 
@@ -171,6 +174,7 @@ public class StreamExecPythonGroupTableAggregate extends ExecNodeBase<RowData>
     @SuppressWarnings("unchecked")
     private OneInputStreamOperator<RowData, RowData> getPythonTableAggregateFunctionOperator(
             Configuration config,
+            ClassLoader classLoader,
             RowType inputRowType,
             RowType outputRowType,
             PythonAggregateFunctionInfo[] aggregateFunctions,
@@ -179,7 +183,9 @@ public class StreamExecPythonGroupTableAggregate extends ExecNodeBase<RowData>
             long maxIdleStateRetentionTime,
             boolean generateUpdateBefore,
             int indexOfCountStar) {
-        Class<?> clazz = CommonPythonUtil.loadClass(PYTHON_STREAM_TABLE_AGGREGATE_OPERATOR_NAME);
+        Class<?> clazz =
+                CommonPythonUtil.loadClass(
+                        PYTHON_STREAM_TABLE_AGGREGATE_OPERATOR_NAME, classLoader);
         try {
             Constructor<?> ctor =
                     clazz.getConstructor(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
index 8aa55962285..6e210a642b0 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonGroupWindowAggregate.java
@@ -258,7 +258,8 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
         WindowAssigner<?> windowAssigner = windowAssignerAndTrigger.f0;
         Trigger<?> trigger = windowAssignerAndTrigger.f1;
         final Configuration pythonConfig =
-                CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config);
+                CommonPythonUtil.extractPythonConfiguration(
+                        planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader());
         final ExecNodeConfig pythonNodeConfig =
                 ExecNodeConfig.ofNodeConfig(pythonConfig, config.isCompiled());
         boolean isGeneralPythonUDAF =
@@ -289,6 +290,7 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
                             emitStrategy.getAllowLateness(),
                             pythonConfig,
                             pythonNodeConfig,
+                            planner.getFlinkContext().getClassLoader(),
                             shiftTimeZone);
         } else {
             transform =
@@ -306,7 +308,8 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
                             shiftTimeZone);
         }
 
-        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(pythonConfig)) {
+        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(
+                pythonConfig, planner.getFlinkContext().getClassLoader())) {
             transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
         }
         // set KeyType and Selector for state
@@ -436,6 +439,7 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
                     long allowance,
                     Configuration pythonConfig,
                     ExecNodeConfig pythonNodeConfig,
+                    ClassLoader classLoader,
                     ZoneId shiftTimeZone) {
         final int inputCountIndex = aggInfoList.getIndexOfCountStar();
         final boolean countStarInserted = aggInfoList.countStarInserted();
@@ -446,6 +450,7 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
         OneInputStreamOperator<RowData, RowData> pythonOperator =
                 getGeneralPythonStreamGroupWindowAggregateFunctionOperator(
                         pythonConfig,
+                        classLoader,
                         inputRowType,
                         outputRowType,
                         windowAssigner,
@@ -484,7 +489,8 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
                     ZoneId shiftTimeZone) {
         Class clazz =
                 CommonPythonUtil.loadClass(
-                        ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME);
+                        ARROW_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME,
+                        classLoader);
         RowType userDefinedFunctionInputType =
                 (RowType) Projection.of(udafInputOffsets).project(inputRowType);
         RowType userDefinedFunctionOutputType =
@@ -542,6 +548,7 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
     private OneInputStreamOperator<RowData, RowData>
             getGeneralPythonStreamGroupWindowAggregateFunctionOperator(
                     Configuration config,
+                    ClassLoader classLoader,
                     RowType inputType,
                     RowType outputType,
                     WindowAssigner<?> windowAssigner,
@@ -555,7 +562,8 @@ public class StreamExecPythonGroupWindowAggregate extends StreamExecAggregateBas
                     ZoneId shiftTimeZone) {
         Class clazz =
                 CommonPythonUtil.loadClass(
-                        GENERAL_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME);
+                        GENERAL_STREAM_PYTHON_GROUP_WINDOW_AGGREGATE_FUNCTION_OPERATOR_NAME,
+                        classLoader);
 
         boolean isRowTime = AggregateUtil.isRowtimeAttribute(window.timeAttribute());
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
index fd507b97d4a..d1057bcab6f 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecPythonOverAggregate.java
@@ -197,7 +197,8 @@ public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData>
         }
         long precedingOffset = -1 * (long) boundValue;
         Configuration pythonConfig =
-                CommonPythonUtil.extractPythonConfiguration(planner.getExecEnv(), config);
+                CommonPythonUtil.extractPythonConfiguration(
+                        planner.getExecEnv(), config, planner.getFlinkContext().getClassLoader());
         OneInputTransformation<RowData, RowData> transform =
                 createPythonOneInputTransformation(
                         inputTransform,
@@ -213,7 +214,8 @@ public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData>
                         config,
                         planner.getFlinkContext().getClassLoader());
 
-        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(pythonConfig)) {
+        if (CommonPythonUtil.isPythonWorkerUsingManagedMemory(
+                pythonConfig, planner.getFlinkContext().getClassLoader())) {
             transform.declareManagedMemoryUseCaseAtSlotScope(ManagedMemoryUseCase.PYTHON);
         }
 
@@ -306,7 +308,7 @@ public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData>
                 className =
                         ARROW_PYTHON_OVER_WINDOW_ROWS_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME;
             }
-            Class<?> clazz = CommonPythonUtil.loadClass(className);
+            Class<?> clazz = CommonPythonUtil.loadClass(className, classLoader);
 
             try {
                 Constructor<?> ctor =
@@ -349,7 +351,7 @@ public class StreamExecPythonOverAggregate extends ExecNodeBase<RowData>
                 className =
                         ARROW_PYTHON_OVER_WINDOW_RANGE_PROC_TIME_AGGREGATE_FUNCTION_OPERATOR_NAME;
             }
-            Class<?> clazz = CommonPythonUtil.loadClass(className);
+            Class<?> clazz = CommonPythonUtil.loadClass(className, classLoader);
             try {
                 Constructor<?> ctor =
                         clazz.getConstructor(
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
index a949ad2afa6..201407b718a 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/utils/CommonPythonUtil.java
@@ -98,9 +98,9 @@ public class CommonPythonUtil {
 
     private CommonPythonUtil() {}
 
-    public static Class<?> loadClass(String className) {
+    public static Class<?> loadClass(String className, ClassLoader classLoader) {
         try {
-            return Class.forName(className, false, Thread.currentThread().getContextClassLoader());
+            return Class.forName(className, false, classLoader);
         } catch (ClassNotFoundException e) {
             throw new TableException(
                     "The dependency of 'flink-python' is not present on the classpath.", e);
@@ -108,8 +108,8 @@ public class CommonPythonUtil {
     }
 
     public static Configuration extractPythonConfiguration(
-            StreamExecutionEnvironment env, ReadableConfig tableConfig) {
-        Class<?> clazz = loadClass(PYTHON_CONFIG_UTILS_CLASS);
+            StreamExecutionEnvironment env, ReadableConfig tableConfig, ClassLoader classLoader) {
+        Class<?> clazz = loadClass(PYTHON_CONFIG_UTILS_CLASS, classLoader);
         try {
             StreamExecutionEnvironment realEnv = getRealEnvironment(env);
             Method method =
@@ -125,20 +125,27 @@ public class CommonPythonUtil {
     }
 
     public static PythonFunctionInfo createPythonFunctionInfo(
-            RexCall pythonRexCall, Map<RexNode, Integer> inputNodes) {
+            RexCall pythonRexCall, Map<RexNode, Integer> inputNodes, ClassLoader classLoader) {
         SqlOperator operator = pythonRexCall.getOperator();
         try {
             if (operator instanceof ScalarSqlFunction) {
                 return createPythonFunctionInfo(
-                        pythonRexCall, inputNodes, ((ScalarSqlFunction) operator).scalarFunction());
+                        pythonRexCall,
+                        inputNodes,
+                        ((ScalarSqlFunction) operator).scalarFunction(),
+                        classLoader);
             } else if (operator instanceof TableSqlFunction) {
                 return createPythonFunctionInfo(
-                        pythonRexCall, inputNodes, ((TableSqlFunction) operator).udtf());
+                        pythonRexCall,
+                        inputNodes,
+                        ((TableSqlFunction) operator).udtf(),
+                        classLoader);
             } else if (operator instanceof BridgingSqlFunction) {
                 return createPythonFunctionInfo(
                         pythonRexCall,
                         inputNodes,
-                        ((BridgingSqlFunction) operator).getDefinition());
+                        ((BridgingSqlFunction) operator).getDefinition(),
+                        classLoader);
             }
         } catch (InvocationTargetException | IllegalAccessException e) {
             throw new TableException("Method pickleValue accessed failed. ", e);
@@ -147,8 +154,9 @@ public class CommonPythonUtil {
     }
 
     @SuppressWarnings("unchecked")
-    public static boolean isPythonWorkerUsingManagedMemory(Configuration config) {
-        Class<?> clazz = loadClass(PYTHON_OPTIONS_CLASS);
+    public static boolean isPythonWorkerUsingManagedMemory(
+            Configuration config, ClassLoader classLoader) {
+        Class<?> clazz = loadClass(PYTHON_OPTIONS_CLASS, classLoader);
         try {
             return config.getBoolean(
                     (ConfigOption<Boolean>) (clazz.getField("USE_MANAGED_MEMORY").get(null)));
@@ -158,8 +166,9 @@ public class CommonPythonUtil {
     }
 
     @SuppressWarnings("unchecked")
-    public static boolean isPythonWorkerInProcessMode(Configuration config) {
-        Class<?> clazz = loadClass(PYTHON_OPTIONS_CLASS);
+    public static boolean isPythonWorkerInProcessMode(
+            Configuration config, ClassLoader classLoader) {
+        Class<?> clazz = loadClass(PYTHON_OPTIONS_CLASS, classLoader);
         try {
             return config.getString(
                             (ConfigOption<String>)
@@ -337,7 +346,8 @@ public class CommonPythonUtil {
                         });
     }
 
-    private static byte[] convertLiteralToPython(RexLiteral o, SqlTypeName typeName)
+    private static byte[] convertLiteralToPython(
+            RexLiteral o, SqlTypeName typeName, ClassLoader classLoader)
             throws InvocationTargetException, IllegalAccessException {
         byte type;
         Object value;
@@ -396,16 +406,18 @@ public class CommonPythonUtil {
                     throw new RuntimeException("Unsupported type " + typeName);
             }
         }
-        loadPickleValue();
+        loadPickleValue(classLoader);
         return (byte[]) pickleValue.invoke(null, value, type);
     }
 
-    private static void loadPickleValue() {
+    private static void loadPickleValue(ClassLoader classLoader) {
         if (pickleValue == null) {
             synchronized (CommonPythonUtil.class) {
                 if (pickleValue == null) {
                     Class<?> clazz =
-                            loadClass("org.apache.flink.api.common.python.PythonBridgeUtils");
+                            loadClass(
+                                    "org.apache.flink.api.common.python.PythonBridgeUtils",
+                                    classLoader);
                     try {
                         pickleValue = clazz.getMethod("pickleValue", Object.class, byte.class);
                     } catch (NoSuchMethodException e) {
@@ -419,18 +431,21 @@ public class CommonPythonUtil {
     private static PythonFunctionInfo createPythonFunctionInfo(
             RexCall pythonRexCall,
             Map<RexNode, Integer> inputNodes,
-            FunctionDefinition functionDefinition)
+            FunctionDefinition functionDefinition,
+            ClassLoader classLoader)
             throws InvocationTargetException, IllegalAccessException {
         ArrayList<Object> inputs = new ArrayList<>();
         for (RexNode operand : pythonRexCall.getOperands()) {
             if (operand instanceof RexCall) {
                 RexCall childPythonRexCall = (RexCall) operand;
                 PythonFunctionInfo argPythonInfo =
-                        createPythonFunctionInfo(childPythonRexCall, inputNodes);
+                        createPythonFunctionInfo(childPythonRexCall, inputNodes, classLoader);
                 inputs.add(argPythonInfo);
             } else if (operand instanceof RexLiteral) {
                 RexLiteral literal = (RexLiteral) operand;
-                inputs.add(convertLiteralToPython(literal, literal.getType().getSqlTypeName()));
+                inputs.add(
+                        convertLiteralToPython(
+                                literal, literal.getType().getSqlTypeName(), classLoader));
             } else {
                 if (inputNodes.containsKey(operand)) {
                     inputs.add(inputNodes.get(operand));
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java
index 397476b0ebb..ad1b15e8ea8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/physical/common/CommonPhysicalMatchRule.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalMatch;
 import org.apache.flink.table.planner.plan.trait.FlinkRelDistribution;
 import org.apache.flink.table.planner.plan.utils.MatchUtil.AggregationPatternVariableFinder;
 import org.apache.flink.table.planner.plan.utils.RexDefaultVisitor;
+import org.apache.flink.table.planner.utils.ShortcutUtils;
 
 import org.apache.calcite.plan.RelOptCluster;
 import org.apache.calcite.plan.RelOptRule;
@@ -87,7 +88,7 @@ public abstract class CommonPhysicalMatchRule extends ConverterRule {
             Class.forName(
                     "org.apache.flink.cep.pattern.Pattern",
                     false,
-                    Thread.currentThread().getContextClassLoader());
+                    ShortcutUtils.unwrapContext(rel).getClassLoader());
         } catch (ClassNotFoundException e) {
             throw new TableException(
                     "MATCH RECOGNIZE clause requires flink-cep dependency to be present on the classpath.",
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 8fb8f80d37d..2681ed64fad 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -473,7 +473,7 @@ abstract class PlannerBase(
     tableConfig.set(TABLE_QUERY_CURRENT_DATABASE, currentDatabase)
 
     // We pass only the configuration to avoid reconfiguration with the rootConfiguration
-    getExecEnv.configure(tableConfig.getConfiguration, Thread.currentThread().getContextClassLoader)
+    getExecEnv.configure(tableConfig.getConfiguration, classLoader)
 
     // Use config parallelism to override env parallelism.
     val defaultParallelism =
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortRule.scala
index b1b58d803c0..ef387d485d1 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalSortRule.scala
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.plan.rules.physical.batch
 import org.apache.flink.annotation.Experimental
 import org.apache.flink.configuration.ConfigOption
 import org.apache.flink.configuration.ConfigOptions.key
-import org.apache.flink.table.planner.calcite.FlinkContext
 import org.apache.flink.table.planner.plan.`trait`.FlinkRelDistribution
 import org.apache.flink.table.planner.plan.nodes.FlinkConventions
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalSort