You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2022/09/17 05:10:27 UTC

[incubator-seatunnel] branch dev updated: [Improve][e2e] Add driver-jar to lib (#2719)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d64d452c8 [Improve][e2e] Add driver-jar to lib (#2719)
d64d452c8 is described below

commit d64d452c86534547bbd17ef8659058a58db670b0
Author: liugddx <80...@qq.com>
AuthorDate: Sat Sep 17 13:10:21 2022 +0800

    [Improve][e2e] Add driver-jar to lib (#2719)
    
    * [Improve][e2e] add driver-jar to lib
    
    * discovery third-party jars
    
    * Remove excess code
    
    * modify e2e test case
    
    * modify e2e test case
    
    * fix e2e error,if i run docker in another machine, the case will error
    
    * fix some bug
    
    * fix some bug
    
    * fix some bug
    
    * use root to start spark container.
    
    * move `addURLToClassLoader` into  `registerPlugin`
    
    * move `ADD_URL_TO_CLASSLOADER` into  `FlinkCommon`
---
 .../org/apache/seatunnel/common/config/Common.java |  2 +-
 seatunnel-connectors-v2/connector-jdbc/pom.xml     |  3 +-
 .../core/starter/flink/config/FlinkCommon.java     | 42 +++++++++++++
 .../execution/AbstractPluginExecuteProcessor.java  | 14 +----
 .../starter/flink/execution/FlinkExecution.java    |  8 ++-
 .../seatunnel/e2e/common/AbstractContainer.java    |  6 ++
 .../e2e/common/AbstractSparkContainer.java         |  5 +-
 .../connector-jdbc-flink-e2e/pom.xml               |  3 +-
 .../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java      | 18 ++++--
 .../seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java    |  9 ++-
 .../e2e/flink/v2/jdbc/JdbcGreenplumIT.java         | 40 +++++++-----
 .../seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java | 71 +++++++++++----------
 .../e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java   | 18 ++++--
 .../seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java    |  8 +--
 .../e2e/spark/v2/jdbc/JdbcGreenplumIT.java         | 41 +++++++-----
 .../seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java | 73 +++++++++++-----------
 .../plugin/discovery/AbstractPluginDiscovery.java  | 14 ++---
 17 files changed, 223 insertions(+), 152 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
index 294cc4965..28b64a728 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/Common.java
@@ -145,7 +145,7 @@ public class Common {
     /**
      * return plugin's dependent jars, which located in 'plugins/${pluginName}/lib/*'.
      */
-    public static List<Path> getPluginsJarDependencies(){
+    public static List<Path> getPluginsJarDependencies() {
         Path pluginRootDir = Common.pluginRootDir();
         if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
             return Collections.emptyList();
diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml
index 68b6ad284..3aa3e9404 100644
--- a/seatunnel-connectors-v2/connector-jdbc/pom.xml
+++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml
@@ -28,7 +28,7 @@
     <modelVersion>4.0.0</modelVersion>
 
     <artifactId>connector-jdbc</artifactId>
-    
+
     <properties>
         <phoenix.version>5.2.5-HBase-2.x</phoenix.version>
         <mysql.version>8.0.16</mysql.version>
@@ -49,6 +49,7 @@
             <groupId>org.postgresql</groupId>
             <artifactId>postgresql</artifactId>
             <version>${postgresql.version}</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>com.dameng</groupId>
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkCommon.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkCommon.java
new file mode 100644
index 000000000..7d6dcac55
--- /dev/null
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkCommon.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.flink.config;
+
+import org.apache.seatunnel.common.utils.ReflectionUtils;
+
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.function.BiConsumer;
+
+public class FlinkCommon {
+
+    /**
+     * Add jar url to classloader. The different engine should have different logic to add url into
+     * their own classloader
+     */
+    public static BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER = (classLoader, url) -> {
+        if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
+            URLClassLoader c = (URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get();
+            ReflectionUtils.invoke(c, "addURL", url);
+        } else if (classLoader instanceof URLClassLoader) {
+            ReflectionUtils.invoke(classLoader, "addURL", url);
+        } else {
+            throw new RuntimeException("Unsupported classloader: " + classLoader.getClass().getName());
+        }
+    };
+}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
index 1b680b9e8..04a512b99 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.java
@@ -21,7 +21,7 @@ import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
 import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
 
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.common.utils.ReflectionUtils;
+import org.apache.seatunnel.core.starter.flink.config.FlinkCommon;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.util.TableUtil;
 
@@ -33,7 +33,6 @@ import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 
 import java.net.URL;
-import java.net.URLClassLoader;
 import java.util.List;
 import java.util.Optional;
 import java.util.function.BiConsumer;
@@ -47,16 +46,7 @@ public abstract class AbstractPluginExecuteProcessor<T> implements PluginExecute
     protected static final String ENGINE_TYPE = "seatunnel";
     protected static final String PLUGIN_NAME = "plugin_name";
 
-    protected final BiConsumer<ClassLoader, URL> addUrlToClassloader = (classLoader, url) -> {
-        if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
-            URLClassLoader c = (URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get();
-            ReflectionUtils.invoke(c, "addURL", url);
-        } else if (classLoader instanceof URLClassLoader) {
-            ReflectionUtils.invoke(classLoader, "addURL", url);
-        } else {
-            throw new RuntimeException("Unsupported classloader: " + classLoader.getClass().getName());
-        }
-    };
+    protected final BiConsumer<ClassLoader, URL> addUrlToClassloader = FlinkCommon.ADD_URL_TO_CLASSLOADER;
 
     protected AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment,
                                              JobContext jobContext,
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index a4c52638a..51b36e7d2 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.common.Constants;
 import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.core.starter.execution.TaskExecution;
+import org.apache.seatunnel.core.starter.flink.config.FlinkCommon;
 import org.apache.seatunnel.core.starter.flink.config.FlinkEnvironmentFactory;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 
@@ -54,10 +55,10 @@ public class FlinkExecution implements TaskExecution {
         this.flinkEnvironment = new FlinkEnvironmentFactory(config).getEnvironment();
         JobContext jobContext = new JobContext();
         jobContext.setJobMode(flinkEnvironment.getJobMode());
+        registerPlugin();
         this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.SOURCE));
         this.transformPluginExecuteProcessor = new TransformExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.TRANSFORM));
         this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList(Constants.SINK));
-        registerPlugin();
     }
 
     @Override
@@ -75,7 +76,7 @@ public class FlinkExecution implements TaskExecution {
         }
     }
 
-    private void registerPlugin(){
+    private void registerPlugin() {
         List<URL> pluginsJarDependencies = Common.getPluginsJarDependencies().stream()
             .map(Path::toUri)
             .map(uri -> {
@@ -86,6 +87,9 @@ public class FlinkExecution implements TaskExecution {
                 }
             })
             .collect(Collectors.toList());
+
+        pluginsJarDependencies.forEach(url -> FlinkCommon.ADD_URL_TO_CLASSLOADER.accept(Thread.currentThread().getContextClassLoader(), url));
+
         flinkEnvironment.registerPlugin(pluginsJarDependencies);
     }
 }
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java
index 71e93ab92..c1550d993 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractContainer.java
@@ -64,6 +64,10 @@ public abstract class AbstractContainer {
 
     protected abstract List<String> getExtraStartShellCommands();
 
+    protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
+        //do nothing
+    }
+
     protected void copySeaTunnelStarter(GenericContainer<?> container) {
         ContainerUtil.copySeaTunnelStarter(container,
             this.startModuleName,
@@ -81,6 +85,8 @@ public abstract class AbstractContainer {
             getConnectorNamePrefix(),
             getConnectorType(),
             getSeaTunnelHomeInContainer());
+        // execute extra commands
+        executeExtraCommands(container);
         return executeCommand(container, confInContainerPath);
     }
 
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java
index 6cb49b5a0..3c10c8181 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java
@@ -61,7 +61,8 @@ public abstract class AbstractSparkContainer extends AbstractContainer {
             .withNetworkAliases("spark-master")
             .withExposedPorts()
             .withEnv("SPARK_MODE", "master")
-            .withLogConsumer(new Slf4jLogConsumer(LOG));
+            .withLogConsumer(new Slf4jLogConsumer(LOG))
+            .withCreateContainerCmdModifier(cmd -> cmd.withUser("root"));
         // In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to
         // start a worker.
         Startables.deepStart(Stream.of(master)).join();
@@ -79,7 +80,7 @@ public abstract class AbstractSparkContainer extends AbstractContainer {
     @Override
     protected List<String> getExtraStartShellCommands() {
         return Arrays.asList("--master local",
-                             "--deploy-mode client");
+            "--deploy-mode client");
     }
 
     public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOException, InterruptedException {
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
index eb9ed148f..1d46370b6 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
@@ -72,6 +72,7 @@
             <groupId>org.postgresql</groupId>
             <artifactId>postgresql</artifactId>
             <version>${postgresql.version}</version>
+            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>com.dameng</groupId>
@@ -80,4 +81,4 @@
         </dependency>
     </dependencies>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
index f83781571..9164a7f5c 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
@@ -29,6 +29,7 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.PostgreSQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
@@ -47,14 +48,15 @@ import java.util.stream.Stream;
 public class FakeSourceToJdbcIT extends FlinkContainer {
     private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
     private PostgreSQLContainer<?> psl;
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @BeforeEach
     public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
         psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
-                .withNetwork(NETWORK)
-                .withNetworkAliases("postgresql")
-                .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+            .withNetwork(NETWORK)
+            .withNetworkAliases("postgresql")
+            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
         Startables.deepStart(Stream.of(psl)).join();
         LOGGER.info("PostgreSql container started");
         Class.forName(psl.getDriverClassName());
@@ -70,8 +72,8 @@ public class FakeSourceToJdbcIT extends FlinkContainer {
         try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
             Statement statement = connection.createStatement();
             String sql = "CREATE TABLE test (\n" +
-                    "  name varchar(255) NOT NULL\n" +
-                    ")";
+                "  name varchar(255) NOT NULL\n" +
+                ")";
             statement.execute(sql);
         } catch (SQLException e) {
             throw new RuntimeException("Initializing PostgreSql table failed!", e);
@@ -101,4 +103,10 @@ public class FakeSourceToJdbcIT extends FlinkContainer {
             psl.stop();
         }
     }
+
+    @Override
+    protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
+        Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/flink/seatunnel/plugins/Jdbc/lib && cd /tmp/flink/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+        Assertions.assertEquals(0, extraCommands.getExitCode());
+    }
 }
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java
index 861200526..67969ae81 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java
@@ -51,8 +51,7 @@ public class JdbcDmdbIT extends FlinkContainer {
     private static final String DOCKER_IMAGE = "laglangyue/dmdb8";
     private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
     private static final String HOST = "flink_e2e_dmdb";
-    private static final String LOCAL_HOST = "localhost";
-    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+    private static final String URL = "jdbc:dm://%s:5236";
     private static final String USERNAME = "SYSDBA";
     private static final String PASSWORD = "SYSDBA";
     private static final String DATABASE = "SYSDBA";
@@ -81,7 +80,8 @@ public class JdbcDmdbIT extends FlinkContainer {
     }
 
     private void initializeJdbcConnection() throws SQLException {
-        jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
+        jdbcConnection = DriverManager.getConnection(String.format(
+            URL, dbServer.getHost()), USERNAME, PASSWORD);
     }
 
     /**
@@ -110,8 +110,7 @@ public class JdbcDmdbIT extends FlinkContainer {
     }
 
     private void assertHasData(String table) {
-        try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
-            Statement statement = connection.createStatement();
+        try (Statement statement = jdbcConnection.createStatement();) {
             String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
             ResultSet source = statement.executeQuery(sql);
             Assertions.assertTrue(source.next());
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
index ac4366501..366106ae8 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
@@ -51,14 +51,13 @@ public class JdbcGreenplumIT extends FlinkContainer {
     private static final String GREENPLUM_IMAGE = "datagrip/greenplum:6.8";
     private static final String GREENPLUM_CONTAINER_HOST = "flink_e2e_greenplum";
     private static final int GREENPLUM_CONTAINER_PORT = 5432;
-    private static final String GREENPLUM_HOST = "localhost";
     private static final int GREENPLUM_PORT = 5435;
     private static final String GREENPLUM_USER = "tester";
     private static final String GREENPLUM_PASSWORD = "pivotal";
     private static final String GREENPLUM_DRIVER = "org.postgresql.Driver";
-    private static final String GREENPLUM_JDBC_URL = String.format(
-            "jdbc:postgresql://%s:%s/testdb", GREENPLUM_HOST, GREENPLUM_PORT);
+    private static final String GREENPLUM_JDBC_URL = "jdbc:postgresql://%s:%s/testdb";
     private static final List<List> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
 
     private GenericContainer<?> greenplumServer;
     private Connection jdbcConnection;
@@ -66,11 +65,11 @@ public class JdbcGreenplumIT extends FlinkContainer {
     @BeforeEach
     public void startGreenplumContainer() throws ClassNotFoundException, SQLException {
         greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE)
-                .withNetwork(NETWORK)
-                .withNetworkAliases(GREENPLUM_CONTAINER_HOST)
-                .withLogConsumer(new Slf4jLogConsumer(log));
+            .withNetwork(NETWORK)
+            .withNetworkAliases(GREENPLUM_CONTAINER_HOST)
+            .withLogConsumer(new Slf4jLogConsumer(log));
         greenplumServer.setPortBindings(Lists.newArrayList(
-                String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT)));
+            String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT)));
         Startables.deepStart(Stream.of(greenplumServer)).join();
         log.info("Greenplum container started");
         // wait for Greenplum fully start
@@ -97,28 +96,29 @@ public class JdbcGreenplumIT extends FlinkContainer {
             ResultSet resultSet = statement.executeQuery(sql);
             while (resultSet.next()) {
                 result.add(Arrays.asList(
-                        resultSet.getInt(1),
-                        resultSet.getString(2)));
+                    resultSet.getInt(1),
+                    resultSet.getString(2)));
             }
         }
         Assertions.assertIterableEquals(TEST_DATASET, result);
     }
 
     private void initializeJdbcConnection() throws SQLException {
-        jdbcConnection = DriverManager.getConnection(GREENPLUM_JDBC_URL,
-                GREENPLUM_USER, GREENPLUM_PASSWORD);
+        jdbcConnection = DriverManager.getConnection(String.format(
+                GREENPLUM_JDBC_URL, greenplumServer.getHost(), GREENPLUM_PORT),
+            GREENPLUM_USER, GREENPLUM_PASSWORD);
     }
 
     private void initializeJdbcTable() throws SQLException {
         try (Statement statement = jdbcConnection.createStatement()) {
             String createSource = "CREATE TABLE source (\n" +
-                    "age INT NOT NULL,\n" +
-                    "name VARCHAR(255) NOT NULL\n" +
-                    ")";
+                "age INT NOT NULL,\n" +
+                "name VARCHAR(255) NOT NULL\n" +
+                ")";
             String createSink = "CREATE TABLE sink (\n" +
-                    "age INT NOT NULL,\n" +
-                    "name VARCHAR(255) NOT NULL\n" +
-                    ")";
+                "age INT NOT NULL,\n" +
+                "name VARCHAR(255) NOT NULL\n" +
+                ")";
             statement.execute(createSource);
             statement.execute(createSink);
         }
@@ -158,4 +158,10 @@ public class JdbcGreenplumIT extends FlinkContainer {
             jdbcConnection.close();
         }
     }
+
+    @Override
+    protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
+        Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/flink/seatunnel/plugins/Jdbc/lib && cd /tmp/flink/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+        Assertions.assertEquals(0, extraCommands.getExitCode());
+    }
 }
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java
index e71e1a505..1fdc96a86 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java
@@ -48,12 +48,11 @@ public class JdbcPhoenixIT extends FlinkContainer {
     private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
 
     private static final String PHOENIX_CONTAINER_HOST = "flink_e2e_phoenix_sink";
-    private static final String PHOENIX_HOST = "localhost";
 
     private static final int PHOENIX_PORT = 8764;
     private static final int PHOENIX_CONTAINER_PORT = 8765;
 
-    private static final String PHOENIX_CONNECT_URL = String.format("jdbc:phoenix:thin:url=http://%s:%s;serialization=PROTOBUF", PHOENIX_HOST, PHOENIX_PORT);
+    private static final String PHOENIX_CONNECT_URL = "jdbc:phoenix:thin:url=http://%s:%s;serialization=PROTOBUF";
     private static final String PHOENIX_JDBC_DRIVER = "org.apache.phoenix.queryserver.client.Driver";
 
     private GenericContainer<?> phoenixServer;
@@ -63,11 +62,11 @@ public class JdbcPhoenixIT extends FlinkContainer {
     @BeforeEach
     public void startPhoenixContainer() throws ClassNotFoundException, SQLException {
         phoenixServer = new GenericContainer<>(PHOENIX_DOCKER_IMAGE)
-                .withNetwork(NETWORK)
-                .withNetworkAliases(PHOENIX_CONTAINER_HOST)
-                .withLogConsumer(new Slf4jLogConsumer(log));
+            .withNetwork(NETWORK)
+            .withNetworkAliases(PHOENIX_CONTAINER_HOST)
+            .withLogConsumer(new Slf4jLogConsumer(log));
         phoenixServer.setPortBindings(Lists.newArrayList(
-                String.format("%s:%s", PHOENIX_PORT, PHOENIX_CONTAINER_PORT)));
+            String.format("%s:%s", PHOENIX_PORT, PHOENIX_CONTAINER_PORT)));
         Startables.deepStart(Stream.of(phoenixServer)).join();
         initializeJdbcConnection();
         log.info("phoenix container started");
@@ -87,13 +86,13 @@ public class JdbcPhoenixIT extends FlinkContainer {
             ResultSet resultSet = statement.executeQuery(sql);
             while (resultSet.next()) {
                 result.add(Arrays.asList(
-                        resultSet.getString(1),
-                        resultSet.getBoolean(2),
-                        resultSet.getDouble(3),
-                        resultSet.getFloat(4),
-                        resultSet.getShort(5),
-                        resultSet.getInt(6),
-                        resultSet.getInt(7)));
+                    resultSet.getString(1),
+                    resultSet.getBoolean(2),
+                    resultSet.getDouble(3),
+                    resultSet.getFloat(4),
+                    resultSet.getShort(5),
+                    resultSet.getInt(6),
+                    resultSet.getInt(7)));
             }
         }
         Assertions.assertIterableEquals(generateTestDataset(), result);
@@ -101,30 +100,30 @@ public class JdbcPhoenixIT extends FlinkContainer {
 
     private void initializeJdbcConnection() throws SQLException, ClassNotFoundException {
         Class.forName(PHOENIX_JDBC_DRIVER);
-        connection = DriverManager.getConnection(PHOENIX_CONNECT_URL);
+        connection = DriverManager.getConnection(String.format(PHOENIX_CONNECT_URL, phoenixServer.getHost(), PHOENIX_PORT));
     }
 
     private void initializePhoenixTable() {
         try {
             Statement statement = connection.createStatement();
             String createSource = "CREATE TABLE test.source (\n" +
-                    "\tf1 VARCHAR PRIMARY KEY,\n" +
-                    "\tf2 BOOLEAN,\n" +
-                    "\tf3 UNSIGNED_DOUBLE,\n" +
-                    "\tf4 UNSIGNED_FLOAT,\n" +
-                    "\tf5 UNSIGNED_SMALLINT,\n" +
-                    "\tf6 INTEGER,\n" +
-                    "\tf7 UNSIGNED_INT\n" +
-                    ")";
+                "\tf1 VARCHAR PRIMARY KEY,\n" +
+                "\tf2 BOOLEAN,\n" +
+                "\tf3 UNSIGNED_DOUBLE,\n" +
+                "\tf4 UNSIGNED_FLOAT,\n" +
+                "\tf5 UNSIGNED_SMALLINT,\n" +
+                "\tf6 INTEGER,\n" +
+                "\tf7 UNSIGNED_INT\n" +
+                ")";
             String createSink = "CREATE TABLE test.sink (\n" +
-                    "\tf1 VARCHAR PRIMARY KEY,\n" +
-                    "\tf2 BOOLEAN,\n" +
-                    "\tf3 UNSIGNED_DOUBLE,\n" +
-                    "\tf4 UNSIGNED_FLOAT,\n" +
-                    "\tf5 UNSIGNED_SMALLINT,\n" +
-                    "\tf6 INTEGER,\n" +
-                    "\tf7 UNSIGNED_INT\n" +
-                    ")";
+                "\tf1 VARCHAR PRIMARY KEY,\n" +
+                "\tf2 BOOLEAN,\n" +
+                "\tf3 UNSIGNED_DOUBLE,\n" +
+                "\tf4 UNSIGNED_FLOAT,\n" +
+                "\tf5 UNSIGNED_SMALLINT,\n" +
+                "\tf6 INTEGER,\n" +
+                "\tf7 UNSIGNED_INT\n" +
+                ")";
             statement.execute(createSource);
             statement.execute(createSink);
         } catch (SQLException e) {
@@ -143,12 +142,12 @@ public class JdbcPhoenixIT extends FlinkContainer {
         List<List> rows = new ArrayList<>();
         for (int i = 1; i <= 100; i++) {
             rows.add(Arrays.asList(String.format("test_%s", i),
-                    i % 2 == 0,
-                    Double.valueOf(i + 1),
-                    Float.valueOf(i + 2),
-                    (short) (i + 3),
-                    Integer.valueOf(i + 4),
-                    i + 5
+                i % 2 == 0,
+                Double.valueOf(i + 1),
+                Float.valueOf(i + 2),
+                (short) (i + 3),
+                Integer.valueOf(i + 4),
+                i + 5
             ));
         }
         return rows;
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
index bee525373..eac403996 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
@@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.PostgreSQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
@@ -45,14 +46,15 @@ import java.util.stream.Stream;
 public class JdbcSourceToConsoleIT extends FlinkContainer {
     private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
     private PostgreSQLContainer<?> psl;
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @BeforeEach
     public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
         psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
-                .withNetwork(NETWORK)
-                .withNetworkAliases("postgresql")
-                .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+            .withNetwork(NETWORK)
+            .withNetworkAliases("postgresql")
+            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
         Startables.deepStart(Stream.of(psl)).join();
         LOGGER.info("PostgreSql container started");
         Class.forName(psl.getDriverClassName());
@@ -69,8 +71,8 @@ public class JdbcSourceToConsoleIT extends FlinkContainer {
         try (Connection connection = DriverManager.getConnection(psl.getJdbcUrl(), psl.getUsername(), psl.getPassword())) {
             Statement statement = connection.createStatement();
             String sql = "CREATE TABLE test (\n" +
-                    "  name varchar(255) NOT NULL\n" +
-                    ")";
+                "  name varchar(255) NOT NULL\n" +
+                ")";
             statement.execute(sql);
         } catch (SQLException e) {
             throw new RuntimeException("Initializing PostgreSql table failed!", e);
@@ -106,4 +108,10 @@ public class JdbcSourceToConsoleIT extends FlinkContainer {
             psl.stop();
         }
     }
+
+    @Override
+    protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
+        Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/flink/seatunnel/plugins/Jdbc/lib && cd /tmp/flink/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+        Assertions.assertEquals(0, extraCommands.getExitCode());
+    }
 }
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java
index 9845c3c20..ebe7847c1 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java
@@ -53,7 +53,7 @@ public class JdbcDmdbIT extends SparkContainer {
     private static final String DRIVER_CLASS = "dm.jdbc.driver.DmDriver";
     private static final String HOST = "spark_e2e_dmdb";
     private static final String LOCAL_HOST = "localhost";
-    private static final String URL = "jdbc:dm://" + LOCAL_HOST + ":5236";
+    private static final String URL = "jdbc:dm://%s:5236";
     private static final String USERNAME = "SYSDBA";
     private static final String PASSWORD = "SYSDBA";
     private static final String DATABASE = "SYSDBA";
@@ -95,7 +95,8 @@ public class JdbcDmdbIT extends SparkContainer {
     }
 
     private void initializeJdbcConnection() throws SQLException {
-        jdbcConnection = DriverManager.getConnection(URL, USERNAME, PASSWORD);
+        jdbcConnection = DriverManager.getConnection(String.format(
+            URL, dbServer.getHost()), USERNAME, PASSWORD);
     }
 
     /**
@@ -124,8 +125,7 @@ public class JdbcDmdbIT extends SparkContainer {
     }
 
     private void assertHasData(String table) {
-        try (Connection connection = DriverManager.getConnection(URL, USERNAME, PASSWORD)) {
-            Statement statement = connection.createStatement();
+        try (Statement statement = jdbcConnection.createStatement();) {
             String sql = String.format("select * from %s.%s limit 1", DATABASE, table);
             ResultSet source = statement.executeQuery(sql);
             Assertions.assertTrue(source.next());
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
index b11910eee..61aa4b12a 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
@@ -51,14 +51,14 @@ public class JdbcGreenplumIT extends SparkContainer {
     private static final String GREENPLUM_IMAGE = "datagrip/greenplum:6.8";
     private static final String GREENPLUM_CONTAINER_HOST = "spark_e2e_greenplum";
     private static final int GREENPLUM_CONTAINER_PORT = 5432;
-    private static final String GREENPLUM_HOST = "localhost";
     private static final int GREENPLUM_PORT = 5436;
     private static final String GREENPLUM_USER = "tester";
     private static final String GREENPLUM_PASSWORD = "pivotal";
     private static final String GREENPLUM_DRIVER = "org.postgresql.Driver";
-    private static final String GREENPLUM_JDBC_URL = String.format(
-            "jdbc:postgresql://%s:%s/testdb", GREENPLUM_HOST, GREENPLUM_PORT);
+    private static final String GREENPLUM_JDBC_URL = "jdbc:postgresql://%s:%s/testdb";
+
     private static final List<List> TEST_DATASET = generateTestDataset();
+    private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
 
     private GenericContainer<?> greenplumServer;
     private Connection jdbcConnection;
@@ -66,11 +66,11 @@ public class JdbcGreenplumIT extends SparkContainer {
     @BeforeEach
     public void startGreenplumContainer() throws ClassNotFoundException, SQLException {
         greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE)
-                .withNetwork(NETWORK)
-                .withNetworkAliases(GREENPLUM_CONTAINER_HOST)
-                .withLogConsumer(new Slf4jLogConsumer(log));
+            .withNetwork(NETWORK)
+            .withNetworkAliases(GREENPLUM_CONTAINER_HOST)
+            .withLogConsumer(new Slf4jLogConsumer(log));
         greenplumServer.setPortBindings(Lists.newArrayList(
-                String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT)));
+            String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT)));
         Startables.deepStart(Stream.of(greenplumServer)).join();
         log.info("Greenplum container started");
         // wait for Greenplum fully start
@@ -97,28 +97,29 @@ public class JdbcGreenplumIT extends SparkContainer {
             ResultSet resultSet = statement.executeQuery(sql);
             while (resultSet.next()) {
                 result.add(Arrays.asList(
-                        resultSet.getInt(1),
-                        resultSet.getString(2)));
+                    resultSet.getInt(1),
+                    resultSet.getString(2)));
             }
         }
         Assertions.assertIterableEquals(TEST_DATASET, result);
     }
 
     private void initializeJdbcConnection() throws SQLException {
-        jdbcConnection = DriverManager.getConnection(GREENPLUM_JDBC_URL,
-                GREENPLUM_USER, GREENPLUM_PASSWORD);
+        jdbcConnection = DriverManager.getConnection(String.format(
+            GREENPLUM_JDBC_URL, greenplumServer.getHost(), GREENPLUM_PORT),
+            GREENPLUM_USER, GREENPLUM_PASSWORD);
     }
 
     private void initializeJdbcTable() throws SQLException {
         try (Statement statement = jdbcConnection.createStatement()) {
             String createSource = "CREATE TABLE source (\n" +
-                    "age INT NOT NULL,\n" +
-                    "name VARCHAR(255) NOT NULL\n" +
-                    ")";
+                "age INT NOT NULL,\n" +
+                "name VARCHAR(255) NOT NULL\n" +
+                ")";
             String createSink = "CREATE TABLE sink (\n" +
-                    "age INT NOT NULL,\n" +
-                    "name VARCHAR(255) NOT NULL\n" +
-                    ")";
+                "age INT NOT NULL,\n" +
+                "name VARCHAR(255) NOT NULL\n" +
+                ")";
             statement.execute(createSource);
             statement.execute(createSink);
         }
@@ -158,4 +159,10 @@ public class JdbcGreenplumIT extends SparkContainer {
             jdbcConnection.close();
         }
     }
+
+    @Override
+    protected void executeExtraCommands(GenericContainer<?> container) throws IOException, InterruptedException {
+        Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/spark/seatunnel/plugins/Jdbc/lib && cd /tmp/spark/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL);
+        Assertions.assertEquals(0, extraCommands.getExitCode());
+    }
 }
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java
index 66814cc77..34e6e20fc 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java
@@ -48,12 +48,11 @@ public class JdbcPhoenixIT extends SparkContainer {
     private static final String PHOENIX_DOCKER_IMAGE = "iteblog/hbase-phoenix-docker:1.0";
 
     private static final String PHOENIX_CONTAINER_HOST = "spark_e2e_phoenix_sink";
-    private static final String PHOENIX_HOST = "localhost";
 
     private static final int PHOENIX_PORT = 8763;
     private static final int PHOENIX_CONTAINER_PORT = 8765;
 
-    private static final String PHOENIX_CONNECT_URL = String.format("jdbc:phoenix:thin:url=http://%s:%s;serialization=PROTOBUF", PHOENIX_HOST, PHOENIX_PORT);
+    private static final String PHOENIX_CONNECT_URL = "jdbc:phoenix:thin:url=http://%s:%s;serialization=PROTOBUF";
     private static final String PHOENIX_JDBC_DRIVER = "org.apache.phoenix.queryserver.client.Driver";
 
     private GenericContainer<?> phoenixServer;
@@ -63,11 +62,11 @@ public class JdbcPhoenixIT extends SparkContainer {
     @BeforeEach
     public void startPhoenixContainer() throws ClassNotFoundException, SQLException {
         phoenixServer = new GenericContainer<>(PHOENIX_DOCKER_IMAGE)
-                .withNetwork(NETWORK)
-                .withNetworkAliases(PHOENIX_CONTAINER_HOST)
-                .withLogConsumer(new Slf4jLogConsumer(log));
+            .withNetwork(NETWORK)
+            .withNetworkAliases(PHOENIX_CONTAINER_HOST)
+            .withLogConsumer(new Slf4jLogConsumer(log));
         phoenixServer.setPortBindings(Lists.newArrayList(
-                String.format("%s:%s", PHOENIX_PORT, PHOENIX_CONTAINER_PORT)));
+            String.format("%s:%s", PHOENIX_PORT, PHOENIX_CONTAINER_PORT)));
         Startables.deepStart(Stream.of(phoenixServer)).join();
         initializeJdbcConnection();
         log.info("phoenix container started");
@@ -87,13 +86,13 @@ public class JdbcPhoenixIT extends SparkContainer {
             ResultSet resultSet = statement.executeQuery(sql);
             while (resultSet.next()) {
                 result.add(Arrays.asList(
-                        resultSet.getString(1),
-                        resultSet.getBoolean(2),
-                        resultSet.getDouble(3),
-                        resultSet.getFloat(4),
-                        resultSet.getShort(5),
-                        resultSet.getInt(6),
-                        resultSet.getInt(7)));
+                    resultSet.getString(1),
+                    resultSet.getBoolean(2),
+                    resultSet.getDouble(3),
+                    resultSet.getFloat(4),
+                    resultSet.getShort(5),
+                    resultSet.getInt(6),
+                    resultSet.getInt(7)));
             }
         }
         Assertions.assertIterableEquals(generateTestDataset(), result);
@@ -101,30 +100,30 @@ public class JdbcPhoenixIT extends SparkContainer {
 
     private void initializeJdbcConnection() throws SQLException, ClassNotFoundException {
         Class.forName(PHOENIX_JDBC_DRIVER);
-        connection = DriverManager.getConnection(PHOENIX_CONNECT_URL);
+        connection = DriverManager.getConnection(String.format(PHOENIX_CONNECT_URL, phoenixServer.getHost(), PHOENIX_PORT));
     }
 
     private void initializePhoenixTable() {
-        try  {
+        try {
             Statement statement = connection.createStatement();
             String createSource = "CREATE TABLE test.source (\n" +
-                    "\tf1 VARCHAR PRIMARY KEY,\n" +
-                    "\tf2 BOOLEAN,\n" +
-                    "\tf3 UNSIGNED_DOUBLE,\n" +
-                    "\tf4 UNSIGNED_FLOAT,\n" +
-                    "\tf5 UNSIGNED_SMALLINT,\n" +
-                    "\tf6 INTEGER,\n" +
-                    "\tf7 UNSIGNED_INT\n" +
-                    ")";
+                "\tf1 VARCHAR PRIMARY KEY,\n" +
+                "\tf2 BOOLEAN,\n" +
+                "\tf3 UNSIGNED_DOUBLE,\n" +
+                "\tf4 UNSIGNED_FLOAT,\n" +
+                "\tf5 UNSIGNED_SMALLINT,\n" +
+                "\tf6 INTEGER,\n" +
+                "\tf7 UNSIGNED_INT\n" +
+                ")";
             String createSink = "CREATE TABLE test.sink (\n" +
-                    "\tf1 VARCHAR PRIMARY KEY,\n" +
-                    "\tf2 BOOLEAN,\n" +
-                    "\tf3 UNSIGNED_DOUBLE,\n" +
-                    "\tf4 UNSIGNED_FLOAT,\n" +
-                    "\tf5 UNSIGNED_SMALLINT,\n" +
-                    "\tf6 INTEGER,\n" +
-                    "\tf7 UNSIGNED_INT\n" +
-                    ")";
+                "\tf1 VARCHAR PRIMARY KEY,\n" +
+                "\tf2 BOOLEAN,\n" +
+                "\tf3 UNSIGNED_DOUBLE,\n" +
+                "\tf4 UNSIGNED_FLOAT,\n" +
+                "\tf5 UNSIGNED_SMALLINT,\n" +
+                "\tf6 INTEGER,\n" +
+                "\tf7 UNSIGNED_INT\n" +
+                ")";
             statement.execute(createSource);
             statement.execute(createSink);
         } catch (SQLException e) {
@@ -143,12 +142,12 @@ public class JdbcPhoenixIT extends SparkContainer {
         List<List> rows = new ArrayList<>();
         for (int i = 1; i <= 100; i++) {
             rows.add(Arrays.asList(String.format("test_%s", i),
-                    i % 2 == 0,
-                    Double.valueOf(i + 1),
-                    Float.valueOf(i + 2),
-                    (short) (i + 3),
-                    Integer.valueOf(i + 4),
-                    i + 5
+                i % 2 == 0,
+                Double.valueOf(i + 1),
+                Float.valueOf(i + 2),
+                (short) (i + 3),
+                Integer.valueOf(i + 4),
+                i + 5
             ));
         }
         return rows;
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index 529049455..8fffecddd 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -64,7 +64,7 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
     };
 
     protected final ConcurrentHashMap<PluginIdentifier, Optional<URL>> pluginJarPath =
-            new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
+        new ConcurrentHashMap<>(Common.COLLECTION_SIZE);
 
     public AbstractPluginDiscovery(String pluginSubDir, BiConsumer<ClassLoader, URL> addURLToClassloader) {
         this.pluginDir = Common.connectorJarDir(pluginSubDir);
@@ -80,10 +80,10 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
     @Override
     public List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers) {
         return pluginIdentifiers.stream()
-                .map(this::getPluginJarPath)
-                .filter(Optional::isPresent)
-                .map(Optional::get).distinct()
-                .collect(Collectors.toList());
+            .map(this::getPluginJarPath)
+            .filter(Optional::isPresent)
+            .map(Optional::get).distinct()
+            .collect(Collectors.toList());
     }
 
     @Override
@@ -109,13 +109,13 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
                 this.addURLToClassLoader.accept(classLoader, pluginJarPath.get());
             } catch (Exception e) {
                 LOGGER.warn("can't load jar use current thread classloader, use URLClassLoader instead now." +
-                        " message: " + e.getMessage());
+                    " message: " + e.getMessage());
                 classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
             }
             pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
             if (pluginInstance != null) {
                 LOGGER.info("Load plugin: {} from path: {} use classloader: {}",
-                        pluginIdentifier, pluginJarPath.get(), classLoader.getClass().getName());
+                    pluginIdentifier, pluginJarPath.get(), classLoader.getClass().getName());
                 return pluginInstance;
             }
         }