You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/09 14:28:49 UTC

[incubator-seatunnel] branch dev updated: [Improve][E2E] Improve testcase output log style (#3028)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 76b7d31c5 [Improve][E2E] Improve testcase output log style (#3028)
76b7d31c5 is described below

commit 76b7d31c59d8e5cc706f3f770a1b4ddf6fba8354
Author: hailin0 <wa...@apache.org>
AuthorDate: Sun Oct 9 22:28:44 2022 +0800

    [Improve][E2E] Improve testcase output log style (#3028)
---
 .../seatunnel/connectors/seatunnel/jdbc/JdbcDmdbIT.java |  3 ++-
 .../seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java    |  7 +++++--
 .../apache/seatunnel/e2e/connector/redis/RedisIT.java   |  3 ++-
 .../e2e/common/container/AbstractTestContainer.java     | 14 ++++++++++++--
 .../container/flink/AbstractTestFlinkContainer.java     |  9 +++------
 .../container/spark/AbstractTestSparkContainer.java     |  6 ++----
 .../apache/seatunnel/engine/e2e/SeaTunnelContainer.java | 17 ++++++++++++++---
 .../apache/seatunnel/e2e/flink/v2/iotdb/IoTDBIT.java    |  3 ++-
 .../seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java |  6 ++++--
 .../seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java    |  3 ++-
 .../apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java |  6 ++++--
 .../seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java      |  3 ++-
 .../seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java     |  6 ++++--
 .../e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java        |  6 ++++--
 .../seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java    |  6 ++++--
 .../seatunnel/e2e/flink/v2/mongodb/MongodbIT.java       |  3 ++-
 .../e2e/flink/clickhouse/FakeSourceToClickhouseIT.java  |  3 ++-
 .../apache/seatunnel/e2e/spark/v2/iotdb/IoTDBIT.java    |  3 ++-
 .../seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java    |  3 ++-
 .../apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java |  6 ++++--
 .../seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java      |  3 ++-
 .../seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java     |  6 ++++--
 .../seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java    |  6 ++++--
 .../seatunnel/e2e/spark/v2/mongodb/MongodbIT.java       |  3 ++-
 .../seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java    |  6 ++++--
 .../seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java |  6 ++++--
 26 files changed, 98 insertions(+), 48 deletions(-)

diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmdbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmdbIT.java
index 47da09b8c..187a43db8 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmdbIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDmdbIT.java
@@ -40,6 +40,7 @@ import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
@@ -80,7 +81,7 @@ public class JdbcDmdbIT extends TestSuiteBase implements TestResource {
         dbServer = new GenericContainer<>(DOCKER_IMAGE)
                 .withNetwork(NETWORK)
                 .withNetworkAliases(HOST)
-                .withLogConsumer(new Slf4jLogConsumer(log));
+                .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
         dbServer.setPortBindings(Lists.newArrayList(
                 String.format("%s:%s", 5236, 5236)));
         Startables.deepStart(Stream.of(dbServer)).join();
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
index 60746e25b..8b7830971 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
@@ -36,6 +36,7 @@ import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import javax.sql.XADataSource;
 import javax.transaction.xa.XAException;
@@ -48,6 +49,8 @@ import java.util.stream.Stream;
 @Disabled("Temporary fast fix, reason: JdbcDatabaseContainer: ClassNotFoundException: com.mysql.jdbc.Driver")
 class XaGroupOpsImplIT {
 
+    private static final String MYSQL_DOCKER_IMAGE = "mysql:8.0.29";
+
     private MySQLContainer<?> mc;
     private XaGroupOps xaGroupOps;
     private SemanticXidGenerator xidGenerator;
@@ -58,9 +61,9 @@ class XaGroupOpsImplIT {
     @BeforeEach
     void before() throws Exception {
         // Non-root users need to grant XA_RECOVER_ADMIN permission
-        mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
+        mc = new MySQLContainer<>(DockerImageName.parse(MYSQL_DOCKER_IMAGE))
             .withUsername("root")
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MYSQL_DOCKER_IMAGE)));
         Startables.deepStart(Stream.of(mc)).join();
 
         jdbcConnectionOptions = JdbcConnectionOptions.builder()
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
index 8e82fc8b4..2e0cb83b6 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/java/org/apache/seatunnel/e2e/connector/redis/RedisIT.java
@@ -42,6 +42,7 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 import redis.clients.jedis.Jedis;
 
 import java.io.IOException;
@@ -76,7 +77,7 @@ public class RedisIT extends TestSuiteBase implements TestResource {
             .withNetwork(NETWORK)
             .withNetworkAliases(HOST)
             .withExposedPorts(PORT)
-            .withLogConsumer(new Slf4jLogConsumer(log))
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IMAGE)))
             .withCommand(String.format("redis-server --requirepass %s", PASSWORD))
             .waitingFor(new HostPortWaitStrategy()
                 .withStartupTimeout(Duration.ofMinutes(2)));
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
index acec24c3b..211838871 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java
@@ -102,8 +102,18 @@ public abstract class AbstractTestContainer implements TestContainer {
         command.addAll(getExtraStartShellCommands());
 
         Container.ExecResult execResult = container.execInContainer("bash", "-c", String.join(" ", command));
-        LOG.info(execResult.getStdout());
-        LOG.error(execResult.getStderr());
+        if (execResult.getStdout() != null && execResult.getStdout().length() > 0) {
+            LOG.info("\n==================== ExecuteConfigFile: {} STDOUT start ====================\n"
+                    + "{}"
+                    + "\n==================== ExecuteConfigFile: {} STDOUT end   ====================",
+                configPath, execResult.getStdout(), configPath);
+        }
+        if (execResult.getStderr() != null && execResult.getStderr().length() > 0) {
+            LOG.error("\n==================== ExecuteConfigFile: {} STDERR start ====================\n"
+                    + "{}"
+                    + "\n==================== ExecuteConfigFile: {} STDERR end   ====================",
+                configPath, execResult.getStderr(), configPath);
+        }
         return execResult;
     }
 }
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 07629c4ac..0920c95fa 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -22,12 +22,11 @@ import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
 
 import lombok.NoArgsConstructor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -43,8 +42,6 @@ import java.util.stream.Stream;
 @NoArgsConstructor
 public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
 
-    protected static final Logger LOG = LoggerFactory.getLogger(AbstractTestFlinkContainer.class);
-
     protected static final List<String> DEFAULT_FLINK_PROPERTIES = Arrays.asList(
         "jobmanager.rpc.address: jobmanager",
         "taskmanager.numberOfTaskSlots: 10",
@@ -71,7 +68,7 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
             .withNetworkAliases("jobmanager")
             .withExposedPorts()
             .withEnv("FLINK_PROPERTIES", properties)
-            .withLogConsumer(new Slf4jLogConsumer(LOG));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(dockerImage + ":jobmanager")));
 
         taskManager =
             new GenericContainer<>(dockerImage)
@@ -80,7 +77,7 @@ public abstract class AbstractTestFlinkContainer extends AbstractTestContainer {
                 .withNetworkAliases("taskmanager")
                 .withEnv("FLINK_PROPERTIES", properties)
                 .dependsOn(jobManager)
-                .withLogConsumer(new Slf4jLogConsumer(LOG));
+                .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(dockerImage + ":taskmanager")));
 
         Startables.deepStart(Stream.of(jobManager)).join();
         Startables.deepStart(Stream.of(taskManager)).join();
diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
index 4f3118ab3..4ed27a369 100644
--- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
+++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/spark/AbstractTestSparkContainer.java
@@ -20,12 +20,11 @@ package org.apache.seatunnel.e2e.common.container.spark;
 import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -34,7 +33,6 @@ import java.util.stream.Stream;
 
 public abstract class AbstractTestSparkContainer extends AbstractTestContainer {
 
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractTestSparkContainer.class);
     private static final String DEFAULT_DOCKER_IMAGE = "bitnami/spark:2.4.6";
 
     protected GenericContainer<?> master;
@@ -51,7 +49,7 @@ public abstract class AbstractTestSparkContainer extends AbstractTestContainer {
             .withNetworkAliases("spark-master")
             .withExposedPorts()
             .withEnv("SPARK_MODE", "master")
-            .withLogConsumer(new Slf4jLogConsumer(LOG))
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(getDockerImage())))
             .withCreateContainerCmdModifier(cmd -> cmd.withUser("root"));
         // In most case we can just use standalone mode to execute a spark job, if we want to use cluster mode, we need to
         // start a worker.
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java
index d0d9e65e4..4c3938a40 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java
@@ -28,6 +28,7 @@ import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.DockerLoggerFactory;
 import org.testcontainers.utility.MountableFile;
 
 import java.io.File;
@@ -64,7 +65,7 @@ public abstract class SeaTunnelContainer {
             .withCommand(Paths.get(SEATUNNEL_BIN, SERVER_SHELL).toString())
             .withNetworkAliases("server")
             .withExposedPorts()
-            .withLogConsumer(new Slf4jLogConsumer(LOG))
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger("seatunnel-engine:" + JDK_DOCKER_IMAGE)))
             .waitingFor(Wait.forLogMessage(".*received new worker register.*\\n", 1));
         mountMapping.forEach(SERVER::withFileSystemBind);
         SERVER.start();
@@ -113,8 +114,18 @@ public abstract class SeaTunnelContainer {
             }
         });
         Container.ExecResult execResult = SERVER.execInContainer("bash", "-c", String.join(" ", command));
-        LOG.info(execResult.getStdout());
-        LOG.error(execResult.getStderr());
+        if (execResult.getStdout() != null && execResult.getStdout().length() > 0) {
+            LOG.info("\n==================== ExecuteConfigFile: {} STDOUT start ====================\n"
+                    + "{}"
+                    + "\n==================== ExecuteConfigFile: {} STDOUT end   ====================",
+                confFile, execResult.getStdout(), confFile);
+        }
+        if (execResult.getStderr() != null && execResult.getStderr().length() > 0) {
+            LOG.error("\n==================== ExecuteConfigFile: {} STDERR start ====================\n"
+                    + "{}"
+                    + "\n==================== ExecuteConfigFile: {} STDERR end   ====================",
+                confFile, execResult.getStderr(), confFile);
+        }
         return execResult;
     }
 
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/IoTDBIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/IoTDBIT.java
index 29dde429f..be1067004 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/IoTDBIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iotdb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iotdb/IoTDBIT.java
@@ -41,6 +41,7 @@ import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -71,7 +72,7 @@ public class IoTDBIT extends FlinkContainer {
         iotdbServer = new GenericContainer<>(IOTDB_DOCKER_IMAGE)
             .withNetwork(NETWORK)
             .withNetworkAliases(IOTDB_HOST)
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IOTDB_DOCKER_IMAGE)));
         iotdbServer.setPortBindings(Lists.newArrayList(
             String.format("%s:6667", IOTDB_PORT)));
         Startables.deepStart(Stream.of(iotdbServer)).join();
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
index 12f4b2a06..89395062e 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
@@ -33,6 +33,7 @@ import org.testcontainers.containers.PostgreSQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -46,16 +47,17 @@ import java.util.stream.Stream;
 
 @Slf4j
 public class FakeSourceToJdbcIT extends FlinkContainer {
+    private static final String DOCKER_IMAGE = "postgres:alpine3.16";
     private PostgreSQLContainer<?> psl;
     private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @BeforeEach
     public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
-        psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
+        psl = new PostgreSQLContainer<>(DockerImageName.parse(DOCKER_IMAGE))
             .withNetwork(NETWORK)
             .withNetworkAliases("postgresql")
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
         Startables.deepStart(Stream.of(psl)).join();
         log.info("PostgreSql container started");
         Class.forName(psl.getDriverClassName());
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
index 9446f5ce5..50df0f760 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcGreenplumIT.java
@@ -31,6 +31,7 @@ import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -67,7 +68,7 @@ public class JdbcGreenplumIT extends FlinkContainer {
         greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE)
             .withNetwork(NETWORK)
             .withNetworkAliases(GREENPLUM_CONTAINER_HOST)
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(GREENPLUM_IMAGE)));
         greenplumServer.setPortBindings(Lists.newArrayList(
             String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT)));
         Startables.deepStart(Stream.of(greenplumServer)).join();
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
index 2d0a75e1e..42328cee0 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
@@ -36,6 +36,7 @@ import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
@@ -58,6 +59,7 @@ import java.util.stream.Stream;
 
 @Slf4j
 public class JdbcMysqlIT extends FlinkContainer {
+    private static final String DOCKER_IMAGE = "bitnami/mysql:8.0.29";
     private MySQLContainer<?> mc;
     private Config config;
     private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
@@ -66,11 +68,11 @@ public class JdbcMysqlIT extends FlinkContainer {
     @BeforeEach
     public void startMySqlContainer() throws Exception {
         // Non-root users need to grant XA_RECOVER_ADMIN permission on is_exactly_once = "true"
-        mc = new MySQLContainer<>(DockerImageName.parse("bitnami/mysql:8.0.29").asCompatibleSubstituteFor("mysql"))
+        mc = new MySQLContainer<>(DockerImageName.parse(DOCKER_IMAGE).asCompatibleSubstituteFor("mysql"))
             .withNetwork(NETWORK)
             .withNetworkAliases("mysql")
             .withUsername("root")
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
         Startables.deepStart(Stream.of(mc)).join();
         log.info("Mysql container started");
         Class.forName(mc.getDriverClassName());
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java
index 0c6e87944..5cbe08ffd 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPhoenixIT.java
@@ -29,6 +29,7 @@ import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -65,7 +66,7 @@ public class JdbcPhoenixIT extends FlinkContainer {
         phoenixServer = new GenericContainer<>(PHOENIX_DOCKER_IMAGE)
             .withNetwork(NETWORK)
             .withNetworkAliases(PHOENIX_CONTAINER_HOST)
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PHOENIX_DOCKER_IMAGE)));
         phoenixServer.setPortBindings(Lists.newArrayList(
             String.format("%s:%s", PHOENIX_PORT, PHOENIX_CONTAINER_PORT)));
         Startables.deepStart(Stream.of(phoenixServer)).join();
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
index f946b4ee9..505ef872c 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
@@ -32,6 +32,7 @@ import org.testcontainers.containers.PostgreSQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -50,18 +51,19 @@ import java.util.stream.Stream;
 
 @Slf4j
 public class JdbcPostgresIT extends FlinkContainer {
+    private static final String DOCKER_IMAGE = "postgres:14-alpine";
     private PostgreSQLContainer<?> pg;
     private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @BeforeEach
     public void startPostgreSqlContainer() throws Exception {
-        pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14-alpine"))
+        pg = new PostgreSQLContainer<>(DockerImageName.parse(DOCKER_IMAGE))
             .withNetwork(NETWORK)
             .withNetworkAliases("postgresql")
             .withCommand("postgres -c max_prepared_transactions=100")
             .withUsername("root")
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
         Startables.deepStart(Stream.of(pg)).join();
         log.info("Postgres container started");
         Class.forName(pg.getDriverClassName());
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
index 64e09c85d..9418cd38f 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
@@ -32,6 +32,7 @@ import org.testcontainers.containers.PostgreSQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -44,16 +45,17 @@ import java.util.stream.Stream;
 
 @Slf4j
 public class JdbcSourceToConsoleIT extends FlinkContainer {
+    private static final String DOCKER_IMAGE = "postgres:alpine3.16";
     private PostgreSQLContainer<?> psl;
     private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @BeforeEach
     public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
-        psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
+        psl = new PostgreSQLContainer<>(DockerImageName.parse(DOCKER_IMAGE))
             .withNetwork(NETWORK)
             .withNetworkAliases("postgresql")
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
         Startables.deepStart(Stream.of(psl)).join();
         log.info("PostgreSql container started");
         Class.forName(psl.getDriverClassName());
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java
index 85360a275..1aa725290 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java
@@ -33,6 +33,7 @@ import org.testcontainers.shaded.com.google.common.collect.Lists;
 import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -50,16 +51,17 @@ import java.util.stream.Stream;
 @Slf4j
 public class JdbcSqlserverIT extends FlinkContainer {
 
+    private static final String DOCKER_IMAGE = "mcr.microsoft.com/mssql/server:2022-latest";
     private MSSQLServerContainer<?> mssqlServerContainer;
     private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/9.4.1.jre8/mssql-jdbc-9.4.1.jre8.jar";
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @BeforeEach
     public void startSqlserverContainer() throws ClassNotFoundException, SQLException {
-        mssqlServerContainer = new MSSQLServerContainer<>(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest"))
+        mssqlServerContainer = new MSSQLServerContainer<>(DockerImageName.parse(DOCKER_IMAGE))
             .withNetwork(NETWORK)
             .withNetworkAliases("sqlserver")
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
         Startables.deepStart(Stream.of(mssqlServerContainer)).join();
         log.info("Sqlserver container started");
         Class.forName(mssqlServerContainer.getDriverClassName());
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
index e75555cbe..ecf8b2c21 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-mongodb-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/mongodb/MongodbIT.java
@@ -51,6 +51,7 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -90,7 +91,7 @@ public class MongodbIT extends FlinkContainer {
                 .forPort(MONGODB_PORT)
                 .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED)
                 .withStartupTimeout(Duration.ofMinutes(2)))
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
         Startables.deepStart(Stream.of(mongodbContainer)).join();
         log.info("Mongodb container started");
 
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
index fbd87ee9c..fe69e472c 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
+++ b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
@@ -31,6 +31,7 @@ import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
 import ru.yandex.clickhouse.BalancedClickhouseDataSource;
 import ru.yandex.clickhouse.ClickHouseConnection;
 import ru.yandex.clickhouse.ClickHouseStatement;
@@ -55,7 +56,7 @@ public class FakeSourceToClickhouseIT extends FlinkContainer {
         clickhouseServer = new GenericContainer<>(CLICKHOUSE_DOCKER_IMAGE)
             .withNetwork(NETWORK)
             .withNetworkAliases("clickhouse")
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(CLICKHOUSE_DOCKER_IMAGE)));
         clickhouseServer.setPortBindings(Lists.newArrayList("8123:8123"));
         Startables.deepStart(Stream.of(clickhouseServer)).join();
         log.info("Clickhouse container started");
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/IoTDBIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/IoTDBIT.java
index 5082559e4..e5a76dfaf 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/IoTDBIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iotdb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iotdb/IoTDBIT.java
@@ -40,6 +40,7 @@ import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -70,7 +71,7 @@ public class IoTDBIT extends SparkContainer {
         iotdbServer = new GenericContainer<>(IOTDB_DOCKER_IMAGE)
                 .withNetwork(NETWORK)
                 .withNetworkAliases(IOTDB_HOST)
-                .withLogConsumer(new Slf4jLogConsumer(log));
+                .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IOTDB_DOCKER_IMAGE)));
         iotdbServer.setPortBindings(Lists.newArrayList(
                 String.format("%s:6667", IOTDB_PORT)));
         Startables.deepStart(Stream.of(iotdbServer)).join();
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
index 00a19b7cc..1a5fc8955 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcGreenplumIT.java
@@ -31,6 +31,7 @@ import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -68,7 +69,7 @@ public class JdbcGreenplumIT extends SparkContainer {
         greenplumServer = new GenericContainer<>(GREENPLUM_IMAGE)
             .withNetwork(NETWORK)
             .withNetworkAliases(GREENPLUM_CONTAINER_HOST)
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(GREENPLUM_IMAGE)));
         greenplumServer.setPortBindings(Lists.newArrayList(
             String.format("%s:%s", GREENPLUM_PORT, GREENPLUM_CONTAINER_PORT)));
         Startables.deepStart(Stream.of(greenplumServer)).join();
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
index 08eb08535..02449b6c6 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
@@ -36,6 +36,7 @@ import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
@@ -58,6 +59,7 @@ import java.util.stream.Stream;
 
 @Slf4j
 public class JdbcMysqlIT extends SparkContainer {
+    private static final String DOCKER_IMAGE = "bitnami/mysql:8.0.29";
     private MySQLContainer<?> mc;
     private Config config;
     private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
@@ -66,11 +68,11 @@ public class JdbcMysqlIT extends SparkContainer {
     @BeforeEach
     public void startMySqlContainer() throws Exception {
         // Non-root users need to grant XA_RECOVER_ADMIN permission on is_exactly_once = "true"
-        mc = new MySQLContainer<>(DockerImageName.parse("bitnami/mysql:8.0.29").asCompatibleSubstituteFor("mysql"))
+        mc = new MySQLContainer<>(DockerImageName.parse(DOCKER_IMAGE).asCompatibleSubstituteFor("mysql"))
             .withNetwork(NETWORK)
             .withNetworkAliases("mysql")
             .withUsername("root")
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
         Startables.deepStart(Stream.of(mc)).join();
         log.info("Mysql container started");
         Class.forName(mc.getDriverClassName());
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java
index 201a2c9ad..540dc1bca 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPhoenixIT.java
@@ -29,6 +29,7 @@ import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -65,7 +66,7 @@ public class JdbcPhoenixIT extends SparkContainer {
         phoenixServer = new GenericContainer<>(PHOENIX_DOCKER_IMAGE)
             .withNetwork(NETWORK)
             .withNetworkAliases(PHOENIX_CONTAINER_HOST)
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(PHOENIX_DOCKER_IMAGE)));
         phoenixServer.setPortBindings(Lists.newArrayList(
             String.format("%s:%s", PHOENIX_PORT, PHOENIX_CONTAINER_PORT)));
         Startables.deepStart(Stream.of(phoenixServer)).join();
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
index 03770e8a5..79d9fda1b 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
@@ -32,6 +32,7 @@ import org.testcontainers.containers.PostgreSQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -50,17 +51,18 @@ import java.util.stream.Stream;
 
 @Slf4j
 public class JdbcPostgresIT extends SparkContainer {
+    private static final String DOCKER_IMAGE = "postgres:14-alpine";
     private PostgreSQLContainer<?> pg;
     private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @BeforeEach
     public void startPostgreSqlContainer() throws Exception {
-        pg = new PostgreSQLContainer<>(DockerImageName.parse("postgres:14-alpine"))
+        pg = new PostgreSQLContainer<>(DockerImageName.parse(DOCKER_IMAGE))
             .withNetwork(NETWORK)
             .withNetworkAliases("postgresql")
             .withCommand("postgres -c max_prepared_transactions=100")
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
         Startables.deepStart(Stream.of(pg)).join();
         log.info("Postgres container started");
         Class.forName(pg.getDriverClassName());
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java
index 053ea0b37..22b50f4ee 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java
@@ -33,6 +33,7 @@ import org.testcontainers.shaded.com.google.common.collect.Lists;
 import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
 import org.testcontainers.shaded.org.awaitility.Awaitility;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -50,16 +51,17 @@ import java.util.stream.Stream;
 @Slf4j
 public class JdbcSqlserverIT extends SparkContainer {
 
+    private static final String DOCKER_IMAGE = "mcr.microsoft.com/mssql/server:2022-latest";
     private MSSQLServerContainer<?> mssqlServerContainer;
     private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/9.4.1.jre8/mssql-jdbc-9.4.1.jre8.jar";
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @BeforeEach
     public void startSqlServerContainer() throws ClassNotFoundException, SQLException {
-        mssqlServerContainer = new MSSQLServerContainer<>(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest"))
+        mssqlServerContainer = new MSSQLServerContainer<>(DockerImageName.parse(DOCKER_IMAGE))
             .withNetwork(NETWORK)
             .withNetworkAliases("sqlserver")
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE)));
         Startables.deepStart(Stream.of(mssqlServerContainer)).join();
         log.info("Sqlserver container started");
         Class.forName(mssqlServerContainer.getDriverClassName());
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/MongodbIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/MongodbIT.java
index 17d672c33..dcabbd5d3 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/MongodbIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-mongodb-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/mongodb/MongodbIT.java
@@ -51,6 +51,7 @@ import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -90,7 +91,7 @@ public class MongodbIT extends SparkContainer {
                 .forPort(MONGODB_PORT)
                 .forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED)
                 .withStartupTimeout(Duration.ofMinutes(2)))
-            .withLogConsumer(new Slf4jLogConsumer(log));
+            .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(MONGODB_IMAGE)));
         Startables.deepStart(Stream.of(mongodbContainer)).join();
         log.info("Mongodb container started");
 
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
index 9051c17e7..00f08d254 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
@@ -32,6 +32,7 @@ import org.testcontainers.containers.PostgreSQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -45,15 +46,16 @@ import java.util.stream.Stream;
 
 @Slf4j
 public class FakeSourceToJdbcIT extends SparkContainer {
+    private static final String POSTGRES_DOCKER_IMAGE = "postgres:alpine3.16";
     private PostgreSQLContainer<?> psl;
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @BeforeEach
     public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
-        psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
+        psl = new PostgreSQLContainer<>(DockerImageName.parse(POSTGRES_DOCKER_IMAGE))
                 .withNetwork(NETWORK)
                 .withNetworkAliases("postgresql")
-                .withLogConsumer(new Slf4jLogConsumer(log));
+                .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(POSTGRES_DOCKER_IMAGE)));
         Startables.deepStart(Stream.of(psl)).join();
         log.info("PostgreSql container started");
         Class.forName(psl.getDriverClassName());
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
index eb999dad8..cae0485d5 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
@@ -32,6 +32,7 @@ import org.testcontainers.containers.PostgreSQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
 import org.testcontainers.utility.DockerImageName;
+import org.testcontainers.utility.DockerLoggerFactory;
 
 import java.io.IOException;
 import java.sql.Connection;
@@ -44,15 +45,16 @@ import java.util.stream.Stream;
 
 @Slf4j
 public class JdbcSourceToConsoleIT extends SparkContainer {
+    private static final String POSTGRES_DOCKER_IMAGE = "postgres:alpine3.16";
     private PostgreSQLContainer<?> psl;
 
     @SuppressWarnings("checkstyle:MagicNumber")
     @BeforeEach
     public void startPostgreSqlContainer() throws InterruptedException, ClassNotFoundException, SQLException {
-        psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
+        psl = new PostgreSQLContainer<>(DockerImageName.parse(POSTGRES_DOCKER_IMAGE))
                 .withNetwork(NETWORK)
                 .withNetworkAliases("postgresql")
-                .withLogConsumer(new Slf4jLogConsumer(log));
+                .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(POSTGRES_DOCKER_IMAGE)));
         psl.setPortBindings(Lists.newArrayList("33306:3306"));
         Startables.deepStart(Stream.of(psl)).join();
         log.info("PostgreSql container started");