You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/12 04:10:35 UTC

[incubator-seatunnel] branch dev updated: [Feature][Flink-SQL] Support connector dynamic loading for FlinkSQL job and add Flink SQL jdbc connector (#1850)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 53e7a8ce [Feature][Flink-SQL] Support connector dynamic loading for FlinkSQL job and add Flink SQL jdbc connector (#1850)
53e7a8ce is described below

commit 53e7a8ce92313266ce38387c7aa1239ad3ddff5f
Author: legendtkl <ta...@gmail.com>
AuthorDate: Thu May 12 12:10:29 2022 +0800

    [Feature][Flink-SQL] Support connector dynamic loading for FlinkSQL job and add Flink SQL jdbc connector (#1850)
    
    * Support connector dynamic loading for FlinkSQL job and add FlinkSQL jdbc connector
---
 docs/en/connector/flink-sql/Jdbc.md                |  65 ++++++++++++
 seatunnel-connectors/pom.xml                       |   2 +
 .../pom.xml                                        |  56 ++++++----
 .../flink-sql-connector-jdbc}/pom.xml              |  35 +++---
 .../{ => seatunnel-connectors-flink-sql}/pom.xml   |  32 +++---
 .../core/sql/classloader/CustomClassLoader.java    |  51 +++++++++
 .../apache/seatunnel/core/sql/job/Executor.java    | 117 ++++++++++++++++++++-
 .../sql/classloader/CustomClassLoaderTest.java     |  35 ++++++
 seatunnel-dist/src/main/assembly/assembly-bin.xml  |  10 ++
 9 files changed, 341 insertions(+), 62 deletions(-)

diff --git a/docs/en/connector/flink-sql/Jdbc.md b/docs/en/connector/flink-sql/Jdbc.md
new file mode 100644
index 00000000..3e8413c2
--- /dev/null
+++ b/docs/en/connector/flink-sql/Jdbc.md
@@ -0,0 +1,65 @@
+# Flink SQL JDBC Connector
+
+## Description
+
+We can use the Flink SQL JDBC Connector to connect to a JDBC database. Refer to the [Flink SQL JDBC Connector](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/index.html) for more information.
+
+
+## Usage
+
+### 1. download driver
+A driver dependency is also required to connect to a specified database. Here are drivers currently supported:
+
+| Driver     | Group Id	         | Artifact Id	        | JAR           |
+|------------|-------------------|----------------------|---------------|
+| MySQL	     | mysql	         | mysql-connector-java | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
+| PostgreSQL | org.postgresql	 | postgresql	        | [Download](https://jdbc.postgresql.org/download.html) |
+| Derby	     | org.apache.derby	 | derby	            | [Download](http://db.apache.org/derby/derby_downloads.html) |
+
+After downloading the driver jars, you need to place the jars into $FLINK_HOME/lib/.
+
+### 2. prepare data
+Start mysql server locally, and create a database named "test" and a table named "test_table" in the database.
+
+The table "test_table" could be created by the following SQL:
+```sql
+CREATE TABLE IF NOT EXISTS `test_table`(
+   `id` INT UNSIGNED AUTO_INCREMENT,
+   `name` VARCHAR(100) NOT NULL,
+   PRIMARY KEY ( `id` )
+)ENGINE=InnoDB DEFAULT CHARSET=utf8;
+```
+
+Insert some data into the table "test_table".
+
+### 3. seatunnel config 
+Prepare a seatunnel config file with the following content:
+```sql
+SET table.dml-sync = true;
+
+CREATE TABLE test (
+  id BIGINT,
+  name STRING
+) WITH (
+'connector'='jdbc',
+  'url' = 'jdbc:mysql://localhost:3306/test',
+  'table-name' = 'test_table',
+  'username' = '<replace with your username>',
+  'password' = '<replace with your password>'
+);
+
+CREATE TABLE print_table (
+  id BIGINT,
+  name STRING
+) WITH (
+  'connector' = 'print',
+  'sink.parallelism' = '1'
+);
+
+INSERT INTO print_table SELECT * FROM test;
+```
+
+### 4. run job
+```bash
+./bin/start-seatunnel-sql.sh --config <path/to/your/config>
+```
diff --git a/seatunnel-connectors/pom.xml b/seatunnel-connectors/pom.xml
index 69a02445..ad996224 100644
--- a/seatunnel-connectors/pom.xml
+++ b/seatunnel-connectors/pom.xml
@@ -35,6 +35,8 @@
         <module>seatunnel-connectors-flink-dist</module>
         <module>seatunnel-connectors-spark</module>
         <module>seatunnel-connectors-spark-dist</module>
+        <module>seatunnel-connectors-flink-sql</module>
+        <module>seatunnel-connectors-flink-sql-dist</module>
     </modules>
 
 </project>
diff --git a/seatunnel-connectors/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink-sql-dist/pom.xml
similarity index 51%
copy from seatunnel-connectors/pom.xml
copy to seatunnel-connectors/seatunnel-connectors-flink-sql-dist/pom.xml
index 69a02445..a5b26eac 100644
--- a/seatunnel-connectors/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink-sql-dist/pom.xml
@@ -1,40 +1,58 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
-
        http://www.apache.org/licenses/LICENSE-2.0
-
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
-
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel</artifactId>
-        <version>${revision}</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
+  <parent>
     <artifactId>seatunnel-connectors</artifactId>
-    <packaging>pom</packaging>
+    <groupId>org.apache.seatunnel</groupId>
+    <version>${revision}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>seatunnel-connectors-flink-sql-dist</artifactId>
 
-    <modules>
-        <module>seatunnel-connectors-flink</module>
-        <module>seatunnel-connectors-flink-dist</module>
-        <module>seatunnel-connectors-spark</module>
-        <module>seatunnel-connectors-spark-dist</module>
-    </modules>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.seatunnel</groupId>
+      <artifactId>flink-sql-connector-jdbc</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+  </dependencies>
 
-</project>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>copy-connector</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              <type>jar</type>
+              <includeTypes>jar</includeTypes>
+              <outputDirectory>${project.build.directory}/lib</outputDirectory>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink-sql/flink-sql-connector-jdbc/pom.xml
similarity index 66%
copy from seatunnel-connectors/pom.xml
copy to seatunnel-connectors/seatunnel-connectors-flink-sql/flink-sql-connector-jdbc/pom.xml
index 69a02445..c8c25337 100644
--- a/seatunnel-connectors/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink-sql/flink-sql-connector-jdbc/pom.xml
@@ -1,40 +1,35 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
-
        http://www.apache.org/licenses/LICENSE-2.0
-
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
-
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel</artifactId>
-        <version>${revision}</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>seatunnel-connectors</artifactId>
-    <packaging>pom</packaging>
+  <parent>
+    <artifactId>seatunnel-connectors-flink-sql</artifactId>
+    <groupId>org.apache.seatunnel</groupId>
+    <version>${revision}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
 
-    <modules>
-        <module>seatunnel-connectors-flink</module>
-        <module>seatunnel-connectors-flink-dist</module>
-        <module>seatunnel-connectors-spark</module>
-        <module>seatunnel-connectors-spark-dist</module>
-    </modules>
+  <artifactId>flink-sql-connector-jdbc</artifactId>
 
-</project>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-connector-jdbc_2.11</artifactId>
+      <version>${flink.version}</version>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors/pom.xml b/seatunnel-connectors/seatunnel-connectors-flink-sql/pom.xml
similarity index 69%
copy from seatunnel-connectors/pom.xml
copy to seatunnel-connectors/seatunnel-connectors-flink-sql/pom.xml
index 69a02445..79be2c28 100644
--- a/seatunnel-connectors/pom.xml
+++ b/seatunnel-connectors/seatunnel-connectors-flink-sql/pom.xml
@@ -1,40 +1,34 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-
     Licensed to the Apache Software Foundation (ASF) under one or more
     contributor license agreements.  See the NOTICE file distributed with
     this work for additional information regarding copyright ownership.
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
-
        http://www.apache.org/licenses/LICENSE-2.0
-
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     See the License for the specific language governing permissions and
     limitations under the License.
-
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel</artifactId>
-        <version>${revision}</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
+  <parent>
     <artifactId>seatunnel-connectors</artifactId>
-    <packaging>pom</packaging>
+    <groupId>org.apache.seatunnel</groupId>
+    <version>${revision}</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>seatunnel-connectors-flink-sql</artifactId>
+  <packaging>pom</packaging>
+
+  <modules>
+    <module>flink-sql-connector-jdbc</module>
+  </modules>
 
-    <modules>
-        <module>seatunnel-connectors-flink</module>
-        <module>seatunnel-connectors-flink-dist</module>
-        <module>seatunnel-connectors-spark</module>
-        <module>seatunnel-connectors-spark-dist</module>
-    </modules>
 
-</project>
+</project>
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/classloader/CustomClassLoader.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/classloader/CustomClassLoader.java
new file mode 100644
index 00000000..5777805a
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/classloader/CustomClassLoader.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.sql.classloader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+
+// TODO: maybe a unified plugin-style discovery mechanism is better.
+public class CustomClassLoader extends URLClassLoader {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(CustomClassLoader.class);
+
+    public CustomClassLoader() {
+        super(new URL[0]);
+    }
+
+    /*
+     * If the table declared in 'create table' with connector 'xxx' and the table is not referenced in the job, namely,
+     * used in the 'insert into' statement, the connector 'xxx' will not be needed by Flink.
+     * So it might be ok fail to load it. If it's needed, we can see the error in Flink logs.
+     *
+     * Refer https://github.com/apache/incubator-seatunnel/pull/1850
+     */
+    public void addJar(Path jarPath) {
+        try {
+            this.addURL(jarPath.toUri().toURL());
+        } catch (MalformedURLException e) {
+            LOGGER.error("Failed to add jar to classloader. Jar: {}", jarPath, e);
+        }
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
index 2fd9612f..2320e31b 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
@@ -17,30 +17,55 @@
 
 package org.apache.seatunnel.core.sql.job;
 
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.common.utils.ReflectionUtils;
+import org.apache.seatunnel.core.sql.classloader.CustomClassLoader;
 import org.apache.seatunnel.core.sql.splitter.SqlStatementSplitter;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.Operation;
-
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.ServiceLoader;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 public class Executor {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(Executor.class);
+
     private static final String FLINK_SQL_SET_MATCHING_REGEX = "SET(\\s+(\\S+)\\s*=(.*))?";
     private static final int FLINK_SQL_SET_OPERANDS = 3;
 
+    private static CustomClassLoader CLASSLOADER = new CustomClassLoader();
+
+    private static final String CONNECTOR_IDENTIFIER = "connector";
+    private static final String SQL_CONNECTOR_PREFIX = "flink-sql";
+    private static final String CONNECTOR_JAR_PREFIX = "flink-sql-connector-";
+
     private Executor() {
         throw new IllegalStateException("Utility class");
     }
@@ -50,14 +75,31 @@ public class Executor {
         EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().inStreamingMode().build();
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);
 
-        StatementSet statementSet = handleStatements(jobInfo.getJobContent(), tEnv);
-        statementSet.execute();
+        final Configuration executionEnvConfiguration;
+        try {
+            executionEnvConfiguration =
+                  (Configuration) Objects.requireNonNull(ReflectionUtils.getDeclaredMethod(StreamExecutionEnvironment.class,
+                    "getConfiguration")).orElseThrow(() -> new RuntimeException("can't find " +
+                    "method: getConfiguration")).invoke(env);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(CLASSLOADER);
+
+            StatementSet statementSet = handleStatements(jobInfo.getJobContent(), tEnv, executionEnvConfiguration);
+            statementSet.execute();
+        } finally {
+            Thread.currentThread().setContextClassLoader(originalClassLoader);
+        }
     }
 
     /**
      * Handle each statement.
      */
-    private static StatementSet handleStatements(String workFlowContent, StreamTableEnvironment tEnv) {
+    private static StatementSet handleStatements(String workFlowContent, StreamTableEnvironment tEnv, Configuration executionEnvConfiguration) {
 
         StatementSet statementSet = tEnv.createStatementSet();
         TableEnvironmentImpl stEnv = (TableEnvironmentImpl) tEnv;
@@ -75,12 +117,79 @@ public class Executor {
             if (op instanceof CatalogSinkModifyOperation) {
                 statementSet.addInsertSql(stmt);
             } else {
+                if (op instanceof CreateTableOperation) {
+                    String connectorType = ((CreateTableOperation) op).getCatalogTable().getOptions().get(CONNECTOR_IDENTIFIER);
+                    loadConnector(connectorType, executionEnvConfiguration);
+                }
+
                 tEnv.executeSql(stmt);
             }
         }
         return statementSet;
     }
 
+    /*
+     * The loadConnector method is best-effort, and it will not throw exception if the connector is not found.
+     * If the table declared in 'create table' with connector 'xxx' and the table is not referenced in the job, namely,
+     * used in the 'insert into' statement, the connector 'xxx' will not be needed by Flink.
+     * So it might be ok fail to load it. If it's needed, we can see the error in Flink logs.
+     *
+     * Refer https://github.com/apache/incubator-seatunnel/pull/1850
+     */
+    private static void loadConnector(String connectorType, Configuration configuration) {
+        Iterator<Factory> factories = ServiceLoader.load(Factory.class, CLASSLOADER).iterator();
+        while (factories.hasNext()) {
+            Factory factory = factories.next();
+
+            /**
+             * Handle for two cases:
+             * 1. Flink built-in connectors.
+             * 2. Connectors have been placed in classpath.
+             */
+            if (factory.factoryIdentifier().equals(connectorType)) {
+                return;
+            }
+        }
+
+        Common.setDeployMode(DeployMode.CLIENT.getName());
+        File connectorDir = Common.connectorJarDir(SQL_CONNECTOR_PREFIX).toFile();
+        if (!connectorDir.exists() || connectorDir.listFiles() == null) {
+            return;
+        }
+
+        List<File> connectorFiles = Arrays.stream(connectorDir.listFiles())
+            .filter(file -> file.getName().startsWith(CONNECTOR_JAR_PREFIX + connectorType))
+            .collect(Collectors.toList());
+
+        if (connectorFiles.size() > 1) {
+            LOGGER.warn("Found more than one connector jars for {}. Only the first one will be loaded.", connectorType);
+        }
+
+        File connectorFile = connectorFiles.size() >= 1 ? connectorFiles.get(0) : null;
+
+        if (connectorFile != null) {
+            // handleStatements need this.
+            CLASSLOADER.addJar(connectorFile.toPath());
+
+            List<String> jars = configuration.get(PipelineOptions.JARS);
+            jars = jars == null ? new ArrayList<>() : jars;
+
+            List<String> classpath = configuration.get(PipelineOptions.CLASSPATHS);
+            classpath = classpath == null ? new ArrayList<>() : classpath;
+
+            try {
+                String connectorURL = connectorFile.toPath().toUri().toURL().toString();
+                jars.add(connectorURL);
+                classpath.add(connectorURL);
+
+                configuration.set(PipelineOptions.JARS, jars);
+                configuration.set(PipelineOptions.CLASSPATHS, classpath);
+            } catch (MalformedURLException ignored) {
+                LOGGER.error("Failed to load connector {}. Connector file: {}", connectorType, connectorFile.getAbsolutePath());
+            }
+        }
+    }
+
     @VisibleForTesting
     static Optional<Pair<String, String>> parseSetOperation(String stmt) {
         stmt = stmt.trim();
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/classloader/CustomClassLoaderTest.java b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/classloader/CustomClassLoaderTest.java
new file mode 100644
index 00000000..b8d28ff8
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/test/java/org/apache/seatunnel/core/sql/classloader/CustomClassLoaderTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.sql.classloader;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+
+public class CustomClassLoaderTest {
+
+    @Test
+    public void testAddJar() {
+        CustomClassLoader classLoader = new CustomClassLoader();
+
+        File file = new File(ClassLoader.getSystemResource("flink.sql.conf.template").getPath());
+        classLoader.addJar(file.toPath());
+        Assert.assertEquals(classLoader.getURLs()[0].toString(), file.toURI().toString());
+    }
+}
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index aaa25e23..e003698f 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -98,6 +98,16 @@
             </excludes>
             <outputDirectory>/connectors/flink</outputDirectory>
         </fileSet>
+        <fileSet>
+            <directory>../seatunnel-connectors/seatunnel-connectors-flink-sql-dist/target/lib</directory>
+            <includes>
+                <include>flink-sql-connector*.jar</include>
+            </includes>
+            <excludes>
+                <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
+            </excludes>
+            <outputDirectory>/connectors/flink-sql</outputDirectory>
+        </fileSet>
         <fileSet>
             <directory>../seatunnel-connectors/seatunnel-connectors-spark-dist/target/lib</directory>
             <includes>