You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/08 02:12:48 UTC

[incubator-seatunnel] branch dev updated: [Improve][all] change Log to @Slf4j (#3001)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6016100f1 [Improve][all] change Log to @Slf4j (#3001)
6016100f1 is described below

commit 6016100f123cbf32f8b945155aef6b578be462ec
Author: liugddx <80...@qq.com>
AuthorDate: Sat Oct 8 10:12:42 2022 +0800

    [Improve][all] change Log to @Slf4j (#3001)
    
    * [Improve][all] change Log to @Slf4j
---
 .../apache/seatunnel/flink/FlinkEnvironment.java   | 12 ++++------
 .../seatunnel/flink/batch/FlinkBatchExecution.java | 12 ++++------
 .../flink/stream/FlinkStreamExecution.java         | 10 ++++----
 .../seatunnel/flink/util/EnvironmentUtil.java      | 10 ++++----
 .../apache/seatunnel/spark/SparkEnvironment.java   |  8 +++----
 .../sink/client/ClickhouseSinkWriter.java          |  6 ++---
 .../sink/file/ClickhouseFileSinkWriter.java        |  9 ++++----
 .../clickhouse/sink/file/RsyncFileTransfer.java    | 12 ++++------
 .../clickhouse/sink/file/ScpFileTransfer.java      |  8 +++----
 .../sink/ElasticsearchSinkWriter.java              |  9 +++-----
 .../seatunnel/email/sink/EmailSinkWriter.java      | 14 +++++------
 .../seatunnel/fake/source/FakeSourceReader.java    |  8 +++----
 .../sink/commit/FileSinkAggregatedCommitter.java   |  9 ++++----
 .../seatunnel/file/sink/util/FileSystemUtils.java  | 11 ++++-----
 .../seatunnel/http/sink/HttpSinkWriter.java        |  9 ++++----
 .../seatunnel/http/source/HttpSourceReader.java    | 11 ++++-----
 .../kafka/sink/KafkaInternalProducer.java          |  8 +++----
 .../seatunnel/kafka/sink/KafkaSinkCommitter.java   | 10 ++++----
 .../kafka/sink/KafkaTransactionSender.java         | 10 ++++----
 .../seatunnel/kafka/source/KafkaSourceReader.java  |  6 +----
 .../seatunnel/kudu/kuduclient/KuduInputFormat.java | 15 ++++++------
 .../kudu/kuduclient/KuduOutputFormat.java          | 16 ++++++-------
 .../seatunnel/kudu/sink/KuduSinkWriter.java        |  8 ++-----
 .../seatunnel/kudu/source/KuduSource.java          | 10 ++++----
 .../seatunnel/kudu/source/KuduSourceReader.java    |  8 +++----
 .../mongodb/source/MongodbSourceReader.java        |  8 +++----
 .../socket/source/SocketSourceReader.java          |  7 +++---
 .../sink/ClickhouseFileOutputFormat.java           |  9 ++++----
 .../seatunnel/flink/console/sink/ConsoleSink.java  |  7 +++---
 .../flink/doris/sink/DorisOutputFormat.java        |  7 +++---
 .../flink/doris/sink/DorisStreamLoad.java          |  9 ++++----
 .../flink/druid/sink/DruidOutputFormat.java        |  7 +++---
 .../seatunnel/flink/druid/source/DruidSource.java  |  8 +++----
 .../sink/ElasticsearchOutputFormat.java            |  7 +++---
 .../sink/ElasticsearchOutputFormat.java            |  7 +++---
 .../apache/seatunnel/flink/file/sink/FileSink.java |  8 +++----
 .../apache/seatunnel/flink/jdbc/sink/JdbcSink.java | 17 +++++++-------
 .../seatunnel/flink/jdbc/source/JdbcSource.java    |  9 ++++----
 .../flink/kafka/source/KafkaTableStream.java       |  7 +++---
 .../org/apache/seatunnel/core/base/Seatunnel.java  | 27 +++++++++++-----------
 .../core/base/command/BaseTaskExecuteCommand.java  | 16 ++++++-------
 .../seatunnel/core/base/config/ConfigBuilder.java  | 11 ++++-----
 .../core/base/config/ExecutionFactory.java         |  8 +++----
 .../seatunnel/core/base/utils/AsciiArtUtils.java   |  8 +++----
 .../core/base/utils/CompressionUtils.java          | 22 ++++++++----------
 .../core/sql/classloader/CustomClassLoader.java    |  8 +++----
 .../apache/seatunnel/core/sql/job/Executor.java    | 14 +++++------
 .../flink/command/FlinkApiConfValidateCommand.java |  8 +++----
 .../spark/command/SparkConfValidateCommand.java    |  8 +++----
 .../apache/seatunnel/core/starter/Seatunnel.java   | 27 +++++++++++-----------
 .../core/starter/config/ConfigBuilder.java         | 11 ++++-----
 .../seatunnel/core/starter/config/EngineType.java  |  3 +--
 .../core/starter/utils/AsciiArtUtils.java          |  8 +++----
 .../core/starter/utils/CompressionUtils.java       | 22 ++++++++----------
 .../flink/command/FlinkApiConfValidateCommand.java |  8 +++----
 .../flink/command/FlinkApiTaskExecuteCommand.java  |  6 ++---
 .../starter/flink/execution/FlinkExecution.java    |  8 +++----
 .../spark/command/SparkApiConfValidateCommand.java |  9 ++++----
 .../spark/command/SparkApiTaskExecuteCommand.java  |  8 +++----
 .../starter/spark/execution/SparkExecution.java    |  7 +++---
 .../seatunnel/command/ApiConfValidateCommand.java  |  8 +++----
 .../jdbc/internal/xa/XaGroupOpsImplIT.java         |  7 +++---
 .../seatunnel/engine/e2e/JobExecutionIT.java       |  6 ++---
 .../e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java      |  9 ++++----
 .../seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java   |  9 ++++----
 .../e2e/flink/v2/jdbc/JdbcPostgresIT.java          |  7 ++----
 .../e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java   |  9 ++++----
 .../flink/clickhouse/FakeSourceToClickhouseIT.java | 10 ++++----
 .../e2e/flink/file/FakeSourceToFileIT.java         |  6 ++---
 .../seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java   |  9 ++++----
 .../e2e/spark/v2/jdbc/JdbcPostgresIT.java          |  9 ++++----
 .../e2e/spark/jdbc/FakeSourceToJdbcIT.java         |  9 ++++----
 .../e2e/spark/jdbc/JdbcSourceToConsoleIT.java      |  9 ++++----
 seatunnel-engine/seatunnel-engine-common/pom.xml   |  4 ++--
 .../plugin/discovery/AbstractPluginDiscovery.java  | 19 ++++++++-------
 .../org/apache/seatunnel/flink/transform/UDF.java  | 11 ++++-----
 .../translation/flink/sink/FlinkCommitter.java     |  8 +++----
 .../flink/sink/FlinkGlobalCommitter.java           |  8 +++----
 .../source/batch/ParallelBatchPartitionReader.java |  8 +++----
 79 files changed, 324 insertions(+), 454 deletions(-)

diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index a33b81930..826e33830 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.flink.util.TableUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.ExecutionEnvironment;
@@ -46,8 +47,6 @@ import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.TernaryBoolean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.URL;
 import java.util.ArrayList;
@@ -55,10 +54,9 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class FlinkEnvironment implements RuntimeEnv {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkEnvironment.class);
-
     private Config config;
 
     private StreamExecutionEnvironment environment;
@@ -127,7 +125,7 @@ public class FlinkEnvironment implements RuntimeEnv {
 
     @Override
     public void registerPlugin(List<URL> pluginPaths) {
-        pluginPaths.forEach(url -> LOGGER.info("register plugins : {}", url));
+        pluginPaths.forEach(url -> log.info("register plugins : {}", url));
         List<Configuration> configurations = new ArrayList<>();
         try {
             configurations.add((Configuration) Objects.requireNonNull(ReflectionUtils.getDeclaredMethod(StreamExecutionEnvironment.class,
@@ -248,7 +246,7 @@ public class FlinkEnvironment implements RuntimeEnv {
                     environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
                     break;
                 default:
-                    LOGGER.warn(
+                    log.warn(
                         "set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time",
                         timeType);
                     break;
@@ -272,7 +270,7 @@ public class FlinkEnvironment implements RuntimeEnv {
                         checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
                         break;
                     default:
-                        LOGGER.warn(
+                        log.warn(
                             "set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once",
                             mode);
                         break;
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index a0ff40894..bb8060e38 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -23,22 +23,20 @@ import org.apache.seatunnel.flink.util.TableUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
+@Slf4j
 public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBatchTransform, FlinkBatchSink, FlinkEnvironment> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkBatchExecution.class);
-
     private Config config;
 
     private final FlinkEnvironment flinkEnvironment;
@@ -73,11 +71,11 @@ public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBat
 
         if (whetherExecute(sinks)) {
             try {
-                LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan());
+                log.info("Flink Execution Plan:{}", flinkEnvironment.getBatchEnvironment().getExecutionPlan());
                 JobExecutionResult execute = flinkEnvironment.getBatchEnvironment().execute(flinkEnvironment.getJobName());
-                LOGGER.info(execute.toString());
+                log.info(execute.toString());
             } catch (Exception e) {
-                LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
+                log.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
                 throw e;
             }
         }
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
index d5fef8ed6..8672171cd 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/stream/FlinkStreamExecution.java
@@ -24,21 +24,19 @@ import org.apache.seatunnel.flink.util.TableUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 
+@Slf4j
 public class FlinkStreamExecution implements Execution<FlinkStreamSource, FlinkStreamTransform, FlinkStreamSink, FlinkEnvironment> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkStreamExecution.class);
-
     private Config config;
 
     private final FlinkEnvironment flinkEnvironment;
@@ -71,10 +69,10 @@ public class FlinkStreamExecution implements Execution<FlinkStreamSource, FlinkS
             sink.outputStream(flinkEnvironment, stream);
         }
         try {
-            LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
+            log.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
             flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
         } catch (Exception e) {
-            LOGGER.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
+            log.warn("Flink with job name [{}] execute failed", flinkEnvironment.getJobName());
             throw e;
         }
     }
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java
index e23e26fe1..768d9d75d 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/util/EnvironmentUtil.java
@@ -21,18 +21,16 @@ import org.apache.seatunnel.common.config.CheckResult;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.TimeUnit;
 
+@Slf4j
 public final class EnvironmentUtil {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(EnvironmentUtil.class);
-
     private EnvironmentUtil() {
     }
 
@@ -58,11 +56,11 @@ public final class EnvironmentUtil {
                                 Time.of(delayInterval, TimeUnit.MILLISECONDS)));
                         break;
                     default:
-                        LOGGER.warn("set restart.strategy failed, unknown restart.strategy [{}],only support no,fixed-delay,failure-rate", restartStrategy);
+                        log.warn("set restart.strategy failed, unknown restart.strategy [{}],only support no,fixed-delay,failure-rate", restartStrategy);
                 }
             }
         } catch (Exception e) {
-            LOGGER.warn("set restart.strategy in config '{}' exception", config, e);
+            log.warn("set restart.strategy in config '{}' exception", config, e);
         }
     }
 
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
index b9758babe..b003f6c48 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -28,22 +28,20 @@ import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.streaming.Seconds;
 import org.apache.spark.streaming.StreamingContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.URL;
 import java.util.List;
 
+@Slf4j
 public class SparkEnvironment implements RuntimeEnv {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SparkEnvironment.class);
-
     private static final long DEFAULT_SPARK_STREAMING_DURATION = 5;
 
     private SparkConf sparkConf;
@@ -92,7 +90,7 @@ public class SparkEnvironment implements RuntimeEnv {
 
     @Override
     public void registerPlugin(List<URL> pluginPaths) {
-        LOGGER.info("register plugins :" + pluginPaths);
+        log.info("register plugins :" + pluginPaths);
         // TODO we use --jar parameter to support submit multi-jar in spark cluster at now. Refactor it to
         //  support submit multi-jar in code or remove this logic.
         // this.sparkSession.conf().set("spark.jars",pluginPaths.stream().map(URL::getPath).collect(Collectors.joining(",")));
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index cc38ca0c0..27359ee32 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -39,9 +39,8 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.tool.IntHolder;
 
 import com.clickhouse.jdbc.internal.ClickHouseConnectionImpl;
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.sql.PreparedStatement;
@@ -54,10 +53,9 @@ import java.util.Optional;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+@Slf4j
 public class ClickhouseSinkWriter implements SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseSinkWriter.class);
-
     private final Context context;
     private final ReaderOption option;
     private final ShardRouter shardRouter;
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index c19fdae34..ac5900915 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -30,9 +30,8 @@ import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSink
 import com.clickhouse.client.ClickHouseException;
 import com.clickhouse.client.ClickHouseRequest;
 import com.clickhouse.client.ClickHouseResponse;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -55,8 +54,8 @@ import java.util.UUID;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseFileSinkWriter.class);
     private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/seatunnel-file";
     private static final int UUID_LENGTH = 10;
     private final FileReaderOption readerOption;
@@ -178,7 +177,7 @@ public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, CKComm
                 uuid));
         command.add("--path");
         command.add("\"" + clickhouseLocalFile + "\"");
-        LOGGER.info("Generate clickhouse local file command: {}", String.join(" ", command));
+        log.info("Generate clickhouse local file command: {}", String.join(" ", command));
         ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", command));
         Process start = processBuilder.start();
         // we just wait for the process to finish
@@ -187,7 +186,7 @@ public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, CKComm
              BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
             String line;
             while ((line = bufferedReader.readLine()) != null) {
-                LOGGER.info(line);
+                log.info(line);
             }
         }
         start.waitFor();
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java
index 9d1a888c2..4dd8bc766 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java
@@ -17,11 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -30,10 +29,9 @@ import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.List;
 
+@Slf4j
 public class RsyncFileTransfer implements FileTransfer {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(RsyncFileTransfer.class);
-
     private static final int SSH_PORT = 22;
 
     private final String host;
@@ -84,7 +82,7 @@ public class RsyncFileTransfer implements FileTransfer {
             rsyncCommand.add(sshParameter);
             rsyncCommand.add(sourcePath);
             rsyncCommand.add(String.format("root@%s:%s", host, targetPath));
-            LOGGER.info("Generate rsync command: {}", String.join(" ", rsyncCommand));
+            log.info("Generate rsync command: {}", String.join(" ", rsyncCommand));
             ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", rsyncCommand));
             Process start = processBuilder.start();
             // we just wait for the process to finish
@@ -93,7 +91,7 @@ public class RsyncFileTransfer implements FileTransfer {
                  BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
                 String line;
                 while ((line = bufferedReader.readLine()) != null) {
-                    LOGGER.info(line);
+                    log.info(line);
                 }
             }
             start.waitFor();
@@ -110,7 +108,7 @@ public class RsyncFileTransfer implements FileTransfer {
         command.add("| tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + targetPath);
         try {
             String finalCommand = String.join(" ", command);
-            LOGGER.info("execute remote command: " + finalCommand);
+            log.info("execute remote command: " + finalCommand);
             clientSession.executeRemoteCommand(finalCommand);
         } catch (IOException e) {
             // always return error cause xargs return shell command result
diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
index 53347457d..1c207276a 100644
--- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
+++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
@@ -17,22 +17,20 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sshd.client.SshClient;
 import org.apache.sshd.client.session.ClientSession;
 import org.apache.sshd.scp.client.ScpClient;
 import org.apache.sshd.scp.client.ScpClientCreator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+@Slf4j
 public class ScpFileTransfer implements FileTransfer {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ScpFileTransfer.class);
-
     private static final int SCP_PORT = 22;
 
     private final String host;
@@ -90,7 +88,7 @@ public class ScpFileTransfer implements FileTransfer {
         command.add("| tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} " + targetPath);
         try {
             String finalCommand = String.join(" ", command);
-            LOGGER.info("execute remote command: " + finalCommand);
+            log.info("execute remote command: " + finalCommand);
             clientSession.executeRemoteCommand(finalCommand);
         } catch (IOException e) {
             // always return error cause xargs return shell command result
diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 8754d7567..6a2c32cd6 100644
--- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -33,8 +33,7 @@ import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.Elasticsear
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -44,6 +43,7 @@ import java.util.Optional;
 /**
  * ElasticsearchSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Elasticsearch.
  */
+@Slf4j
 public class ElasticsearchSinkWriter<ElasticsearchSinkStateT> implements SinkWriter<SeaTunnelRow, ElasticsearchCommitInfo, ElasticsearchSinkStateT> {
 
     private final SinkWriter.Context context;
@@ -52,9 +52,6 @@ public class ElasticsearchSinkWriter<ElasticsearchSinkStateT> implements SinkWri
     private final List<String> requestEsList;
     private EsRestClient esRestClient;
 
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchSinkWriter.class);
-
     public ElasticsearchSinkWriter(
             SinkWriter.Context context,
             SeaTunnelRowType seaTunnelRowType,
@@ -115,7 +112,7 @@ public class ElasticsearchSinkWriter<ElasticsearchSinkStateT> implements SinkWri
                     if (tryCnt == maxRetry) {
                         throw new BulkElasticsearchException("bulk es error,try count=%d", ex);
                     }
-                    LOGGER.warn(String.format("bulk es error,try count=%d", tryCnt), ex);
+                    log.warn(String.format("bulk es error,try count=%d", tryCnt), ex);
                 }
 
             }
diff --git a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
index 1a86dce22..2832b7d40 100644
--- a/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-email/src/main/java/org/apache/seatunnel/connectors/seatunnel/email/sink/EmailSinkWriter.java
@@ -25,8 +25,7 @@ import org.apache.seatunnel.connectors.seatunnel.email.config.EmailSinkConfig;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.sun.mail.util.MailSSLSocketFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import javax.activation.DataHandler;
 import javax.activation.DataSource;
@@ -48,10 +47,9 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.util.Properties;
 
+@Slf4j
 public class EmailSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(EmailSinkWriter.class);
-
     private final SeaTunnelRowType seaTunnelRowType;
     private EmailSinkConfig config;
     private StringBuffer stringBuffer;
@@ -130,9 +128,9 @@ public class EmailSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
 
             //   send a message
             Transport.send(message);
-            LOGGER.info("Sent message successfully....");
+            log.info("Sent message successfully....");
         } catch (Exception e) {
-            LOGGER.warn("send email Fail.", e);
+            log.warn("send email Fail.", e);
             throw new RuntimeException("send email Fail.", e);
         }
     }
@@ -148,9 +146,9 @@ public class EmailSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
             FileWriter fileWritter = new FileWriter(file.getName());
             fileWritter.write(data);
             fileWritter.close();
-            LOGGER.info("Create File successfully....");
+            log.info("Create File successfully....");
         } catch (IOException e) {
-            LOGGER.warn("Create File Fail.", e);
+            log.warn("Create File Fail.", e);
             throw new RuntimeException("Create File Fail.", e);
         }
 
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index b847838c0..2dbec493b 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -23,15 +23,13 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.List;
 
+@Slf4j
 public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceReader.class);
-
     private final SingleSplitReaderContext context;
 
     private final FakeDataGenerator fakeDataGenerator;
@@ -61,7 +59,7 @@ public class FakeSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
         }
         if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
             // signal to the source that we have reached the end of the data.
-            LOGGER.info("Closed the bounded fake source");
+            log.info("Closed the bounded fake source");
             context.signalNoMoreElement();
         }
         Thread.sleep(1000L);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
index 06e8a4e5a..de3354642 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
@@ -20,8 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.sink.commit;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -29,8 +28,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+@Slf4j
 public class FileSinkAggregatedCommitter implements SinkAggregatedCommitter<FileCommitInfo, FileAggregatedCommitInfo> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(FileSinkAggregatedCommitter.class);
 
     @Override
     public List<FileAggregatedCommitInfo> commit(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws IOException {
@@ -46,7 +45,7 @@ public class FileSinkAggregatedCommitter implements SinkAggregatedCommitter<File
                     FileSystemUtils.deleteFile(entry.getKey());
                 }
             } catch (Exception e) {
-                LOGGER.error("commit aggregatedCommitInfo error ", e);
+                log.error("commit aggregatedCommitInfo error ", e);
                 errorAggregatedCommitInfoList.add(aggregatedCommitInfo);
             }
         });
@@ -100,7 +99,7 @@ public class FileSinkAggregatedCommitter implements SinkAggregatedCommitter<File
                     FileSystemUtils.deleteFile(entry.getKey());
                 }
             } catch (Exception e) {
-                LOGGER.error("abort aggregatedCommitInfo error ", e);
+                log.error("abort aggregatedCommitInfo error ", e);
             }
         });
     }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index a788d6222..c37874b12 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -18,13 +18,12 @@
 package org.apache.seatunnel.connectors.seatunnel.file.sink.util;
 
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -32,8 +31,8 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
+@Slf4j
 public class FileSystemUtils {
-    private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemUtils.class);
 
     public static final int WRITE_BUFFER_SIZE = 2048;
 
@@ -79,14 +78,14 @@ public class FileSystemUtils {
      */
     public static void renameFile(@NonNull String oldName, @NonNull String newName, boolean rmWhenExist) throws IOException {
         FileSystem fileSystem = getFileSystem(newName);
-        LOGGER.info("begin rename file oldName :[" + oldName + "] to newName :[" + newName + "]");
+        log.info("begin rename file oldName :[" + oldName + "] to newName :[" + newName + "]");
 
         Path oldPath = new Path(oldName);
         Path newPath = new Path(newName);
         if (rmWhenExist) {
             if (fileExist(newName) && fileExist(oldName)) {
                 fileSystem.delete(newPath, true);
-                LOGGER.info("Delete already file: {}", newPath);
+                log.info("Delete already file: {}", newPath);
             }
         }
         if (!fileExist(newName.substring(0, newName.lastIndexOf("/")))) {
@@ -94,7 +93,7 @@ public class FileSystemUtils {
         }
 
         if (fileSystem.rename(oldPath, newPath)) {
-            LOGGER.info("rename file :[" + oldPath + "] to [" + newPath + "] finish");
+            log.info("rename file :[" + oldPath + "] to [" + newPath + "] finish");
         } else {
             throw new IOException("rename file :[" + oldPath + "] to [" + newPath + "] error");
         }
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
index 2def75ba7..d6980e6a9 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/sink/HttpSinkWriter.java
@@ -26,14 +26,13 @@ import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.Objects;
 
+@Slf4j
 public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(HttpSinkWriter.class);
     protected final HttpClientProvider httpClient;
     protected final SeaTunnelRowType seaTunnelRowType;
     protected final HttpParameter httpParameter;
@@ -62,9 +61,9 @@ public class HttpSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
             if (HttpResponse.STATUS_OK == response.getCode()) {
                 return;
             }
-            LOGGER.error("http client execute exception, http response status code:[{}], content:[{}]", response.getCode(), response.getContent());
+            log.error("http client execute exception, http response status code:[{}], content:[{}]", response.getCode(), response.getContent());
         } catch (Exception e) {
-            LOGGER.error(e.getMessage(), e);
+            log.error(e.getMessage(), e);
         }
     }
 
diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
index 0419e2ec3..1582c6442 100644
--- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
+++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java
@@ -28,14 +28,13 @@ import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
 import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
 
 import com.google.common.base.Strings;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 import java.util.Objects;
 
+@Slf4j
 public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(HttpSourceReader.class);
     protected final SingleSplitReaderContext context;
     protected final HttpParameter httpParameter;
     protected HttpClientProvider httpClient;
@@ -70,13 +69,13 @@ public class HttpSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
                 }
                 return;
             }
-            LOGGER.error("http client execute exception, http response status code:[{}], content:[{}]", response.getCode(), response.getContent());
+            log.error("http client execute exception, http response status code:[{}], content:[{}]", response.getCode(), response.getContent());
         } catch (Exception e) {
-            LOGGER.error(e.getMessage(), e);
+            log.error(e.getMessage(), e);
         } finally {
             if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
                 // signal to the source that we have reached the end of the data.
-                LOGGER.info("Closed the bounded http source");
+                log.info("Closed the bounded http source");
                 context.signalNoMoreElement();
             } else {
                 if (httpParameter.getPollIntervalMillis() > 0) {
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
index 77d2535f5..688e1284b 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
@@ -19,11 +19,10 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
 import org.apache.seatunnel.common.utils.ReflectionUtils;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.internals.TransactionManager;
 import org.apache.kafka.common.errors.ProducerFencedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
@@ -34,10 +33,9 @@ import java.util.Properties;
 /**
  * A {@link KafkaProducer} that allow resume transaction from transactionId
  */
+@Slf4j
 public class KafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaInternalProducer.class);
-
     private static final String TRANSACTION_MANAGER_STATE_ENUM =
             "org.apache.kafka.clients.producer.internals.TransactionManager$State";
     private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch";
@@ -97,7 +95,7 @@ public class KafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
 
     public void resumeTransaction(long producerId, short epoch) {
 
-        LOGGER.info(
+        log.info(
                 "Attempting to resume transaction {} with producerId {} and epoch {}",
                 transactionalId,
                 producerId,
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
index aafa0bd4f..8e6740e03 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
@@ -22,18 +22,16 @@ import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Properties;
 
+@Slf4j
 public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSinkCommitter.class);
-
     private final Config pluginConfig;
 
     private KafkaInternalProducer<?, ?> kafkaProducer;
@@ -49,8 +47,8 @@ public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {
         }
         for (KafkaCommitInfo commitInfo : commitInfos) {
             String transactionId = commitInfo.getTransactionId();
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Committing transaction {}", transactionId);
+            if (log.isDebugEnabled()) {
+                log.debug("Committing transaction {}", transactionId);
             }
             KafkaProducer<?, ?> producer = getProducer(commitInfo);
             producer.commitTransaction();
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
index ee8762ac4..a3eaba00b 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -23,11 +23,10 @@ import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
 
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Optional;
@@ -39,10 +38,9 @@ import java.util.Properties;
  * @param <K> key type.
  * @param <V> value type.
  */
+@Slf4j
 public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTransactionSender.class);
-
     private KafkaInternalProducer<K, V> kafkaProducer;
     private String transactionId;
     private final String transactionPrefix;
@@ -91,8 +89,8 @@ public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
         for (long i = checkpointId; ; i++) {
             String transactionId = generateTransactionId(this.transactionPrefix, i);
             producer.setTransactionalId(transactionId);
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Abort kafka transaction: {}", transactionId);
+            if (log.isDebugEnabled()) {
+                log.debug("Abort kafka transaction: {}", transactionId);
             }
             producer.flush();
             if (producer.getEpoch() == 0) {
diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index dc9c62093..bb5486956 100644
--- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -31,8 +31,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -54,8 +52,6 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
     private static final long THREAD_WAIT_TIME = 500L;
     private static final long POLL_TIMEOUT = 10000L;
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSourceReader.class);
-
     private final SourceReader.Context context;
     private final ConsumerMetadata metadata;
     private final Set<KafkaSourceSplit> sourceSplits;
@@ -171,7 +167,7 @@ public class KafkaSourceReader implements SourceReader<SeaTunnelRow, KafkaSource
 
     @Override
     public void handleNoMoreSplits() {
-        LOGGER.info("receive no more splits message, this reader will not add new split.");
+        log.info("receive no more splits message, this reader will not add new split.");
     }
 
     @Override
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
index d00cc2bd3..cb8a59907 100644
--- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.ExceptionUtil;
 import org.apache.seatunnel.common.constants.PluginType;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduClient;
@@ -33,8 +34,6 @@ import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduPredicate;
 import org.apache.kudu.client.KuduScanner;
 import org.apache.kudu.client.RowResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
@@ -44,8 +43,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+@Slf4j
 public class KuduInputFormat implements Serializable {
-    private static final Logger LOGGER = LoggerFactory.getLogger(KuduInputFormat.class);
 
     public KuduInputFormat(String kuduMaster, String tableName, String columnsList) {
         this.kuduMaster = kuduMaster;
@@ -80,7 +79,7 @@ public class KuduInputFormat implements Serializable {
             keyColumn = schema.getPrimaryKeyColumns().get(0).getName();
             columns = schema.getColumns();
         } catch (KuduException e) {
-            LOGGER.warn("get table Columns Schemas Fail.", e);
+            log.warn("get table Columns Schemas Fail.", e);
             throw new RuntimeException("get table Columns Schemas Fail..", e);
         }
         return columns;
@@ -136,7 +135,7 @@ public class KuduInputFormat implements Serializable {
                 seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
             }
         } catch (Exception e) {
-            LOGGER.warn("get row type info exception.", e);
+            log.warn("get row type info exception.", e);
             throw new PrepareFailException("kudu", PluginType.SOURCE, ExceptionUtil.getMessage(e));
         }
         return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
@@ -149,7 +148,7 @@ public class KuduInputFormat implements Serializable {
 
         kuduClient = kuduClientBuilder.build();
 
-        LOGGER.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
+        log.info("The Kudu client is successfully initialized", kuduMaster, kuduClient);
 
     }
 
@@ -180,7 +179,7 @@ public class KuduInputFormat implements Serializable {
             kuduScanner = kuduScannerBuilder.addPredicate(lowerPred)
                     .addPredicate(upperPred).build();
         } catch (KuduException e) {
-            LOGGER.warn("get the Kuduscan object for each splice exception", e);
+            log.warn("get the Kuduscan object for each splice exception", e);
             throw new RuntimeException("get the Kuduscan object for each splice exception.", e);
         }
         return kuduScanner;
@@ -191,7 +190,7 @@ public class KuduInputFormat implements Serializable {
             try {
                 kuduClient.close();
             } catch (KuduException e) {
-                LOGGER.warn("Kudu Client close failed.", e);
+                log.warn("Kudu Client close failed.", e);
                 throw new RuntimeException("Kudu Client close failed.", e);
             } finally {
                 kuduClient = null;
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
index 909940f49..c356baabf 100644
--- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.Insert;
@@ -30,8 +31,6 @@ import org.apache.kudu.client.KuduTable;
 import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.SessionConfiguration;
 import org.apache.kudu.client.Upsert;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.math.BigDecimal;
@@ -41,11 +40,10 @@ import java.sql.Timestamp;
 /**
  * A Kudu outputFormat
  */
+@Slf4j
 public class KuduOutputFormat
         implements Serializable {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(KuduOutputFormat.class);
-
     public static final long TIMEOUTMS = 18000;
     public static final long SESSIONTIMEOUTMS = 100000;
 
@@ -130,7 +128,7 @@ public class KuduOutputFormat
         try {
             kuduSession.apply(upsert);
         } catch (KuduException e) {
-            LOGGER.error("Failed to upsert.", e);
+            log.error("Failed to upsert.", e);
             throw new RuntimeException("Failed to upsert.", e);
         }
     }
@@ -143,7 +141,7 @@ public class KuduOutputFormat
         try {
             kuduSession.apply(insert);
         } catch (KuduException e) {
-            LOGGER.error("Failed to insert.", e);
+            log.error("Failed to insert.", e);
             throw new RuntimeException("Failed to insert.", e);
         }
     }
@@ -172,10 +170,10 @@ public class KuduOutputFormat
         try {
             kuduTable = kuduClient.openTable(kuduTableName);
         } catch (KuduException e) {
-            LOGGER.error("Failed to initialize the Kudu client.", e);
+            log.error("Failed to initialize the Kudu client.", e);
             throw new RuntimeException("Failed to initialize the Kudu client.", e);
         }
-        LOGGER.info("The Kudu client for Master: {} is initialized successfully.", kuduMaster);
+        log.info("The Kudu client for Master: {} is initialized successfully.", kuduMaster);
     }
 
     public void closeOutputFormat() {
@@ -184,7 +182,7 @@ public class KuduOutputFormat
                 kuduClient.close();
                 kuduSession.close();
             } catch (KuduException ignored) {
-                LOGGER.warn("Failed to close Kudu Client.", ignored);
+                log.warn("Failed to close Kudu Client.", ignored);
             } finally {
                 kuduClient = null;
                 kuduSession = null;
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
index d78f48618..994321c8a 100644
--- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/sink/KuduSinkWriter.java
@@ -26,20 +26,16 @@ import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduOutputForma
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import lombok.NonNull;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
 
+@Slf4j
 public class KuduSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(KuduSinkWriter.class);
 
     private SeaTunnelRowType seaTunnelRowType;
     private Config pluginConfig;
-
-
     private KuduOutputFormat fileWriter;
-
     private KuduSinkConfig kuduSinkConfig;
 
     public KuduSinkWriter(@NonNull SeaTunnelRowType seaTunnelRowType,
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
index c10796a21..db11bac07 100644
--- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
@@ -36,22 +36,20 @@ import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduException;
 import org.apache.kudu.client.KuduScanner;
 import org.apache.kudu.client.RowResult;
 import org.apache.kudu.client.RowResultIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 
+@Slf4j
 @AutoService(SeaTunnelSource.class)
 public class KuduSource implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit, KuduSourceState> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(KuduSource.class);
-
     private SeaTunnelRowType rowTypeInfo;
     private KuduInputFormat kuduInputFormat;
     private PartitionParameter partitionParameter;
@@ -85,7 +83,7 @@ public class KuduSource implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit
 
     @Override
     public SourceSplitEnumerator<KuduSourceSplit, KuduSourceState> restoreEnumerator(
-            SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext, KuduSourceState checkpointState) {
+           SourceSplitEnumerator.Context<KuduSourceSplit> enumeratorContext, KuduSourceState checkpointState) {
         // todo:
         return new KuduSourceSplitEnumerator(enumeratorContext, partitionParameter);
     }
@@ -176,7 +174,7 @@ public class KuduSource implements SeaTunnelSource<SeaTunnelRow, KuduSourceSplit
             }
 
         } catch (Exception e) {
-            LOGGER.warn("get row type info exception", e);
+            log.warn("get row type info exception", e);
             throw new PrepareFailException("kudu", PluginType.SOURCE, e.toString());
         }
         return new SeaTunnelRowType(fieldNames.toArray(new String[fieldNames.size()]), seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
diff --git a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java
index e3c3cd25d..05e621bad 100644
--- a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java
+++ b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceReader.java
@@ -23,21 +23,19 @@ import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.client.KuduScanner;
 import org.apache.kudu.client.RowResult;
 import org.apache.kudu.client.RowResultIterator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Deque;
 import java.util.LinkedList;
 import java.util.List;
 
+@Slf4j
 public class KuduSourceReader implements SourceReader<SeaTunnelRow, KuduSourceSplit> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(KuduSourceReader.class);
-
     private final SourceReader.Context context;
 
     private final KuduInputFormat kuduInputFormat;
@@ -79,7 +77,7 @@ public class KuduSourceReader implements SourceReader<SeaTunnelRow, KuduSourceSp
         }
         if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
             // signal to the source that we have reached the end of the data.
-            LOGGER.info("Closed the bounded fake source");
+            log.info("Closed the bounded fake source");
             context.signalNoMoreElement();
         }
 
diff --git a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
index 61a920548..3391205d8 100644
--- a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
+++ b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSourceReader.java
@@ -31,17 +31,15 @@ import com.mongodb.client.MongoClient;
 import com.mongodb.client.MongoClients;
 import com.mongodb.client.MongoCursor;
 import com.mongodb.client.model.Projections;
+import lombok.extern.slf4j.Slf4j;
 import org.bson.Document;
 import org.bson.conversions.Bson;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
+@Slf4j
 public class MongodbSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(MongodbSourceReader.class);
-
     private final SingleSplitReaderContext context;
 
     private MongoClient client;
@@ -102,7 +100,7 @@ public class MongodbSourceReader extends AbstractSingleSplitReader<SeaTunnelRow>
         } finally {
             if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
                 // signal to the source that we have reached the end of the data.
-                LOGGER.info("Closed the bounded mongodb source");
+                log.info("Closed the bounded mongodb source");
                 context.signalNoMoreElement();
             }
         }
diff --git a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
index d37ce180d..8a86fa47b 100644
--- a/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
+++ b/seatunnel-connectors-v2/connector-socket/src/main/java/org/apache/seatunnel/connectors/seatunnel/socket/source/SocketSourceReader.java
@@ -23,8 +23,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -32,8 +31,8 @@ import java.io.InputStreamReader;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 
+@Slf4j
 public class SocketSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(SocketSourceReader.class);
     private static final int CHAR_BUFFER_SIZE = 8192;
     private final SocketSourceParameter parameter;
     private final SingleSplitReaderContext context;
@@ -48,7 +47,7 @@ public class SocketSourceReader extends AbstractSingleSplitReader<SeaTunnelRow>
     @Override
     public void open() throws Exception {
         socket = new Socket();
-        LOGGER.info("connect socket server, host:[{}], port:[{}] ", this.parameter.getHost(), this.parameter.getPort());
+        log.info("connect socket server, host:[{}], port:[{}] ", this.parameter.getHost(), this.parameter.getPort());
         socket.connect(new InetSocketAddress(this.parameter.getHost(), this.parameter.getPort()), 0);
     }
 
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.java
index e5b293c2a..cfccf1d39 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-clickhouse/src/main/java/org/apache/seatunnel/flink/clickhouse/sink/ClickhouseFileOutputFormat.java
@@ -38,11 +38,10 @@ import org.apache.seatunnel.flink.clickhouse.sink.file.ScpFileTransfer;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import ru.yandex.clickhouse.ClickHouseConnectionImpl;
 
 import java.io.BufferedReader;
@@ -66,9 +65,9 @@ import java.util.UUID;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class ClickhouseFileOutputFormat {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseFileOutputFormat.class);
     private static final String CLICKHOUSE_LOCAL_FILE_PREFIX = "/tmp/clickhouse-local/flink-file";
     private static final int UUID_LENGTH = 10;
 
@@ -195,7 +194,7 @@ public class ClickhouseFileOutputFormat {
             uuid));
         command.add("--path");
         command.add("\"" + clickhouseLocalFile + "\"");
-        LOGGER.info("Generate clickhouse local file command: {}", String.join(" ", command));
+        log.info("Generate clickhouse local file command: {}", String.join(" ", command));
         ProcessBuilder processBuilder = new ProcessBuilder("bash", "-c", String.join(" ", command));
         Process start = processBuilder.start();
         // we just wait for the process to finish
@@ -204,7 +203,7 @@ public class ClickhouseFileOutputFormat {
              BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {
             String line;
             while ((line = bufferedReader.readLine()) != null) {
-                LOGGER.info(line);
+                log.info(line);
             }
         }
         start.waitFor();
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
index bf178aa3b..c0fd9cb99 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-console/src/main/java/org/apache/seatunnel/flink/console/sink/ConsoleSink.java
@@ -26,18 +26,17 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 @AutoService(BaseFlinkSink.class)
+@Slf4j
 public class ConsoleSink extends RichOutputFormat<Row> implements FlinkBatchSink, FlinkStreamSink {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ConsoleSink.class);
     private static final String LIMIT = "limit";
     private Integer limit = Integer.MAX_VALUE;
 
@@ -49,7 +48,7 @@ public class ConsoleSink extends RichOutputFormat<Row> implements FlinkBatchSink
         try {
             rowDataSet.first(limit).print();
         } catch (Exception e) {
-            LOGGER.error("Failed to print result! ", e);
+            log.error("Failed to print result! ", e);
         }
     }
 
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisOutputFormat.java
index e3a19d523..0cfeb3cd0 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisOutputFormat.java
@@ -17,14 +17,13 @@
 
 package org.apache.seatunnel.flink.doris.sink;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -42,8 +41,8 @@ import java.util.regex.Pattern;
 /**
  * DorisDynamicOutputFormat
  **/
+@Slf4j
 public class DorisOutputFormat<T> extends RichOutputFormat<T> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(DorisOutputFormat.class);
     private static final long serialVersionUID = -4514164348993670086L;
     private static final long DEFAULT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(1);
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -215,7 +214,7 @@ public class DorisOutputFormat<T> extends RichOutputFormat<T> {
                 batch.clear();
                 break;
             } catch (Exception e) {
-                LOGGER.error("doris sink error, retry times = {}", i, e);
+                log.error("doris sink error, retry times = {}", i, e);
                 if (i >= maxRetries) {
                     throw new IOException(e);
                 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisStreamLoad.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisStreamLoad.java
index ebeb20845..4d4b0e865 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisStreamLoad.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-doris/src/main/java/org/apache/seatunnel/flink/doris/sink/DorisStreamLoad.java
@@ -17,10 +17,9 @@
 
 package org.apache.seatunnel.flink.doris.sink;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
@@ -43,9 +42,9 @@ import java.util.UUID;
 /**
  * doris streamLoad
  */
+@Slf4j
 public class DorisStreamLoad implements Serializable {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(DorisStreamLoad.class);
     private static final long serialVersionUID = -595233501819950489L;
     private static final List<String> DORIS_SUCCESS_STATUS = Arrays.asList("Success", "Publish Timeout");
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -84,7 +83,7 @@ public class DorisStreamLoad implements Serializable {
 
     public void load(String value) {
         LoadResponse loadResponse = loadBatch(value);
-        LOGGER.info("Streamload Response:{}", loadResponse);
+        log.info("Streamload Response:{}", loadResponse);
         if (loadResponse.status != HttpResponseStatus.OK.code()) {
             throw new RuntimeException("stream load error: " + loadResponse.respContent);
         } else {
@@ -140,7 +139,7 @@ public class DorisStreamLoad implements Serializable {
             return new LoadResponse(status, respMsg, response.toString());
         } catch (Exception e) {
             String err = "failed to stream load data with label:" + label;
-            LOGGER.warn(err, e);
+            log.warn(err, e);
             throw new RuntimeException("stream load error: " + err);
         } finally {
             if (feConn != null) {
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java
index dca0c26c4..7cb18cd9a 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/sink/DruidOutputFormat.java
@@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.MapperFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.druid.data.input.MaxSizeSplitHintSpec;
 import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -44,8 +45,6 @@ import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.types.Row;
 import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -57,9 +56,9 @@ import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 
+@Slf4j
 public class DruidOutputFormat extends RichOutputFormat<Row> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(DruidOutputFormat.class);
     private static final long serialVersionUID = -7410857670269773005L;
 
     private static final String DEFAULT_TIMESTAMP_COLUMN = "timestamp";
@@ -149,7 +148,7 @@ public class DruidOutputFormat extends RichOutputFormat<Row> {
             while ((responseLine = br.readLine()) != null) {
                 response.append(responseLine.trim());
             }
-            LOGGER.info("Druid write task has been sent, and the response is {}", response.toString());
+            log.info("Druid write task has been sent, and the response is {}", response.toString());
         }
     }
 
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
index 83478f57d..d964bb666 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-druid/src/main/java/org/apache/seatunnel/flink/druid/source/DruidSource.java
@@ -36,14 +36,13 @@ import org.apache.seatunnel.flink.batch.FlinkBatchSource;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
@@ -54,12 +53,11 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 
+@Slf4j
 @AutoService(BaseFlinkSource.class)
 public class DruidSource implements FlinkBatchSource {
 
     private static final long serialVersionUID = 8152628883440481281L;
-    private static final Logger LOGGER = LoggerFactory.getLogger(DruidSource.class);
-
     private Config config;
     private DruidInputFormat druidInputFormat;
 
@@ -150,7 +148,7 @@ public class DruidSource implements FlinkBatchSource {
                 }
             }
         } catch (Exception e) {
-            LOGGER.warn("Failed to get column information from JDBC URL: {}", jdbcURL, e);
+            log.warn("Failed to get column information from JDBC URL: {}", jdbcURL, e);
         }
 
         int size = map.size();
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/ElasticsearchOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/ElasticsearchOutputFormat.java
index 4687bd126..83c1d9d95 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/ElasticsearchOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch6/src/main/java/org/apache/seatunnel/flink/elasticsearch6/sink/ElasticsearchOutputFormat.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.flink.elasticsearch6.sink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
@@ -33,16 +34,14 @@ import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
 import java.util.List;
 
+@Slf4j
 public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
 
     private static final long serialVersionUID = 2048590860723433896L;
-    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchOutputFormat.class);
 
     private final Config config;
 
@@ -78,7 +77,7 @@ public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
             try {
                 transportClient.addTransportAddresses(new TransportAddress(InetAddress.getByName(host.split(":")[0]), Integer.parseInt(host.split(":")[1])));
             } catch (Exception e) {
-                LOGGER.warn("Host '{}' parse failed.", host, e);
+                log.warn("Host '{}' parse failed.", host, e);
             }
         }
 
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
index a55e38424..18341f7d4 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-elasticsearch7/src/main/java/org/apache/seatunnel/flink/elasticsearch/sink/ElasticsearchOutputFormat.java
@@ -21,6 +21,7 @@ import static org.apache.seatunnel.flink.elasticsearch.config.Config.HOSTS;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
@@ -35,16 +36,14 @@ import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.TransportAddress;
 import org.elasticsearch.transport.client.PreBuiltTransportClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.InetAddress;
 import java.util.List;
 
+@Slf4j
 public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
 
     private static final long serialVersionUID = 2048590860723433896L;
-    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchOutputFormat.class);
 
     private final Config config;
 
@@ -80,7 +79,7 @@ public class ElasticsearchOutputFormat<T> extends RichOutputFormat<T> {
             try {
                 transportClient.addTransportAddresses(new TransportAddress(InetAddress.getByName(host.split(":")[0]), Integer.parseInt(host.split(":")[1])));
             } catch (Exception e) {
-                LOGGER.warn("Host '{}' parse failed.", host, e);
+                log.warn("Host '{}' parse failed.", host, e);
             }
         }
 
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
index b8d3400c8..c1c0ae73e 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/file/sink/FileSink.java
@@ -30,6 +30,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.common.io.FileOutputFormat;
 import org.apache.flink.api.common.serialization.SimpleStringEncoder;
 import org.apache.flink.api.java.DataSet;
@@ -43,16 +44,13 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig
 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
 import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.TimeUnit;
 
+@Slf4j
 @AutoService(BaseFlinkSink.class)
 public class FileSink implements FlinkStreamSink, FlinkBatchSink {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FileSink.class);
-
     private static final long serialVersionUID = -1648045076508797396L;
 
     private static final String PATH = "path";
@@ -114,7 +112,7 @@ public class FileSink implements FlinkStreamSink, FlinkBatchSink {
                 outputFormat = new TextOutputFormat<>(filePath);
                 break;
             default:
-                LOGGER.warn(" unknown file_format [{}],only support json,csv,text", format);
+                log.warn(" unknown file_format [{}],only support json,csv,text", format);
                 break;
 
         }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
index 1ccb3cb98..67530ea1a 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/sink/JdbcSink.java
@@ -39,6 +39,7 @@ import org.apache.seatunnel.flink.stream.FlinkStreamSink;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
@@ -51,8 +52,6 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -60,10 +59,10 @@ import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Arrays;
 
+@Slf4j
 @AutoService(BaseFlinkSink.class)
 public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSink.class);
     private static final long serialVersionUID = 3677571223952518115L;
     private static final int DEFAULT_BATCH_SIZE = 5000;
     private static final int DEFAULT_MAX_RETRY_TIMES = 3;
@@ -189,11 +188,11 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
 
     private void executePreSql() {
         if (StringUtils.isNotBlank(preSql)) {
-            LOGGER.info("Starting to execute pre sql: \n {}", preSql);
+            log.info("Starting to execute pre sql: \n {}", preSql);
             try {
                 executeSql(preSql);
             } catch (SQLException e) {
-                LOGGER.error("Execute pre sql failed, pre sql is : \n {} \n", preSql, e);
+                log.error("Execute pre sql failed, pre sql is : \n {} \n", preSql, e);
                 throw new RuntimeException(e);
             }
         }
@@ -201,11 +200,11 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
 
     private void executePostSql() {
         if (StringUtils.isNotBlank(postSql)) {
-            LOGGER.info("Starting to execute post sql: \n {}", postSql);
+            log.info("Starting to execute post sql: \n {}", postSql);
             try {
                 executeSql(postSql);
             } catch (SQLException e) {
-                LOGGER.error("Execute post sql failed, post sql is : \n {} \n", postSql, e);
+                log.error("Execute post sql failed, post sql is : \n {} \n", postSql, e);
                 if (!ignorePostSqlExceptions) {
                     throw new RuntimeException(e);
                 }
@@ -215,10 +214,10 @@ public class JdbcSink implements FlinkStreamSink, FlinkBatchSink {
 
     private void executeSql(String sql) throws SQLException {
         try (Connection connection = DriverManager.getConnection(dbUrl, username, password);
-            Statement statement = connection.createStatement()) {
+              Statement statement = connection.createStatement()) {
 
             statement.execute(sql);
-            LOGGER.info("Executed sql successfully.");
+            log.info("Executed sql successfully.");
         }
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
index c01e1e0ee..efc0130a0 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-jdbc/src/main/java/org/apache/seatunnel/flink/jdbc/source/JdbcSource.java
@@ -49,6 +49,7 @@ import org.apache.seatunnel.flink.jdbc.input.TypeInformationMap;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
@@ -57,8 +58,6 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.connector.jdbc.split.JdbcNumericBetweenParametersProvider;
 import org.apache.flink.connector.jdbc.split.JdbcParameterValuesProvider;
 import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -70,11 +69,11 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
 
+@Slf4j
 @AutoService(BaseFlinkSource.class)
 public class JdbcSource implements FlinkBatchSource {
 
     private static final long serialVersionUID = -3349505356339446415L;
-    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSource.class);
     private static final int DEFAULT_FETCH_SIZE = 10000;
 
     private Config config;
@@ -205,7 +204,7 @@ public class JdbcSource implements FlinkBatchSource {
                 return getRowInfo(rs.getMetaData(), databaseDialect);
             }
         } catch (SQLException e) {
-            LOGGER.warn("get row type info exception", e);
+            log.warn("get row type info exception", e);
         }
         return new LinkedHashMap<>();
     }
@@ -248,7 +247,7 @@ public class JdbcSource implements FlinkBatchSource {
             return new PostgresTypeInformationMap();
         } else if (StringUtils.containsIgnoreCase(databaseDialect, "oracle")) {
             return new OracleTypeInformationMap();
-        } else if (StringUtils.containsIgnoreCase(databaseDialect, "Hive")){
+        } else if (StringUtils.containsIgnoreCase(databaseDialect, "Hive")) {
             return new HiveTypeInformationMap();
         } else {
             return new DefaultTypeInformationMap();
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
index dbb60c7cf..f962fee4d 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-kafka/src/main/java/org/apache/seatunnel/flink/kafka/source/KafkaTableStream.java
@@ -33,6 +33,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.auto.service.AutoService;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.Table;
@@ -42,17 +43,15 @@ import org.apache.flink.table.descriptors.Kafka;
 import org.apache.flink.table.descriptors.Rowtime;
 import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.Properties;
 
+@Slf4j
 @AutoService(BaseFlinkSource.class)
 public class KafkaTableStream implements FlinkStreamSource {
 
     private static final long serialVersionUID = 5287018194573371428L;
-    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTableStream.class);
 
     private Config config;
 
@@ -175,7 +174,7 @@ public class KafkaTableStream implements FlinkStreamSource {
         try {
             return SchemaUtil.setFormat(format, config);
         } catch (Exception e) {
-            LOGGER.warn("set format exception", e);
+            log.warn("set format exception", e);
         }
         throw new RuntimeException("format config error");
     }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
index e45872848..4c70e776d 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
@@ -22,18 +22,17 @@ import org.apache.seatunnel.common.config.ConfigRuntimeException;
 import org.apache.seatunnel.core.base.command.Command;
 import org.apache.seatunnel.core.base.exception.CommandException;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+@Slf4j
 public class Seatunnel {
-    private static final Logger LOGGER = LoggerFactory.getLogger(Seatunnel.class);
 
     /**
      * This method is the entrypoint of SeaTunnel.
      *
      * @param command commandArgs
-     * @param <T>         commandType
+     * @param <T>     commandType
      */
     public static <T extends CommandArgs> void run(Command<T> command) throws CommandException {
         try {
@@ -48,26 +47,26 @@ public class Seatunnel {
     }
 
     private static void showConfigError(Throwable throwable) {
-        LOGGER.error(
+        log.error(
             "\n\n===============================================================================\n\n");
         String errorMsg = throwable.getMessage();
-        LOGGER.error("Config Error:\n");
-        LOGGER.error("Reason: {} \n", errorMsg);
-        LOGGER.error(
+        log.error("Config Error:\n");
+        log.error("Reason: {} \n", errorMsg);
+        log.error(
             "\n===============================================================================\n\n\n");
     }
 
     private static void showFatalError(Throwable throwable) {
-        LOGGER.error(
+        log.error(
             "\n\n===============================================================================\n\n");
         String errorMsg = throwable.getMessage();
-        LOGGER.error("Fatal Error, \n");
+        log.error("Fatal Error, \n");
         // FIX
-        LOGGER.error(
+        log.error(
             "Please submit bug report in https://github.com/apache/incubator-seatunnel/issues\n");
-        LOGGER.error("Reason:{} \n", errorMsg);
-        LOGGER.error("Exception StackTrace:{} ", ExceptionUtils.getStackTrace(throwable));
-        LOGGER.error(
+        log.error("Reason:{} \n", errorMsg);
+        log.error("Exception StackTrace:{} ", ExceptionUtils.getStackTrace(throwable));
+        log.error(
             "\n===============================================================================\n\n\n");
     }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
index b14b6fa96..8cc825b2b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/BaseTaskExecuteCommand.java
@@ -27,8 +27,7 @@ import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.base.utils.AsciiArtUtils;
 import org.apache.seatunnel.core.base.utils.CompressionUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.io.File;
 import java.util.List;
@@ -39,10 +38,9 @@ import java.util.Objects;
  *
  * @param <T> command args.
  */
+@Slf4j
 public abstract class BaseTaskExecuteCommand<T extends AbstractCommandArgs, E extends RuntimeEnv> implements Command<T> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(BaseTaskExecuteCommand.class);
-
     /**
      * Check the plugin config.
      *
@@ -117,7 +115,7 @@ public abstract class BaseTaskExecuteCommand<T extends AbstractCommandArgs, E ex
                     checkResult = CheckResult.error(e.getMessage());
                 }
                 if (!checkResult.isSuccess()) {
-                    LOGGER.error("Plugin[{}] contains invalid config, error: {} \n", plugin.getClass().getName(), checkResult.getMsg());
+                    log.error("Plugin[{}] contains invalid config, error: {} \n", plugin.getClass().getName(), checkResult.getMsg());
                     System.exit(-1); // invalid configuration
                 }
             }
@@ -128,11 +126,11 @@ public abstract class BaseTaskExecuteCommand<T extends AbstractCommandArgs, E ex
         final DeployMode mode = Common.getDeployMode();
         if (DeployMode.CLUSTER == mode) {
 
-            LOGGER.info("preparing cluster mode work dir files...");
+            log.info("preparing cluster mode work dir files...");
             File workDir = new File(".");
 
             for (File file : Objects.requireNonNull(workDir.listFiles())) {
-                LOGGER.warn("\t list file: {} ", file.getAbsolutePath());
+                log.warn("\t list file: {} ", file.getAbsolutePath());
             }
             // decompress plugin dir
             File compressedFile = new File("plugins.tar.gz");
@@ -141,10 +139,10 @@ public abstract class BaseTaskExecuteCommand<T extends AbstractCommandArgs, E ex
                 File tempFile = CompressionUtils.unGzip(compressedFile, workDir);
                 CompressionUtils.unTar(tempFile, workDir);
             } catch (Exception e) {
-                LOGGER.error("failed to decompress plugins.tar.gz", e);
+                log.error("failed to decompress plugins.tar.gz", e);
                 System.exit(-1);
             }
-            LOGGER.info("succeeded to decompress plugins.tar.gz");
+            log.info("succeeded to decompress plugins.tar.gz");
         }
     }
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
index 5f80ae407..4a1d4485b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
@@ -24,19 +24,16 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.nio.file.Path;
 
 /**
  * Used to build the {@link  Config} from file.
- *
  */
+@Slf4j
 public class ConfigBuilder {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigBuilder.class);
-
     private final Path configFile;
     private final Config config;
 
@@ -51,7 +48,7 @@ public class ConfigBuilder {
             throw new ConfigRuntimeException("Please specify config file");
         }
 
-        LOGGER.info("Loading config file: {}", configFile);
+        log.info("Loading config file: {}", configFile);
 
         // variables substitution / variables resolution order:
         // config file --> system environment --> java properties
@@ -62,7 +59,7 @@ public class ConfigBuilder {
                 ConfigResolveOptions.defaults().setAllowUnresolved(true));
 
         ConfigRenderOptions options = ConfigRenderOptions.concise().setFormatted(true);
-        LOGGER.info("parsed config file: {}", config.root().render(options));
+        log.info("parsed config file: {}", config.root().render(options));
         return config;
     }
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
index 0c4fd178c..25c4f4d7d 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ExecutionFactory.java
@@ -30,18 +30,16 @@ import org.apache.seatunnel.spark.batch.SparkBatchExecution;
 import org.apache.seatunnel.spark.stream.SparkStreamingExecution;
 import org.apache.seatunnel.spark.structuredstream.StructuredStreamingExecution;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 /**
  * Used to create {@link Execution}.
  *
  * @param <ENVIRONMENT> environment type
  */
+@Slf4j
 public class ExecutionFactory<ENVIRONMENT extends RuntimeEnv> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionFactory.class);
-
     public AbstractExecutionContext<ENVIRONMENT> executionContext;
 
     public ExecutionFactory(AbstractExecutionContext<ENVIRONMENT> executionContext) {
@@ -77,7 +75,7 @@ public class ExecutionFactory<ENVIRONMENT extends RuntimeEnv> {
             default:
                 throw new IllegalArgumentException("No suitable engine");
         }
-        LOGGER.info("current execution is [{}]", execution.getClass().getName());
+        log.info("current execution is [{}]", execution.getClass().getName());
         return (Execution<BaseSource<ENVIRONMENT>, BaseTransform<ENVIRONMENT>, BaseSink<ENVIRONMENT>, ENVIRONMENT>) execution;
     }
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/AsciiArtUtils.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/AsciiArtUtils.java
index 326d5d377..fb9b75320 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/AsciiArtUtils.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/AsciiArtUtils.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.core.base.utils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.awt.Font;
 import java.awt.Graphics;
@@ -26,10 +25,9 @@ import java.awt.Graphics2D;
 import java.awt.RenderingHints;
 import java.awt.image.BufferedImage;
 
+@Slf4j
 public final class AsciiArtUtils {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(AsciiArtUtils.class);
-
     private static final int FONT_SIZE = 24;
     private static final int DRAW_X = 6;
     private static final int RGB = -16777216;
@@ -66,7 +64,7 @@ public final class AsciiArtUtils {
             if (sb.toString().trim().isEmpty()) {
                 continue;
             }
-            LOGGER.info(String.valueOf(sb));
+            log.info(String.valueOf(sb));
         }
     }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java
index 87673eb03..636c2c9cb 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/utils/CompressionUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.base.utils;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.compress.archivers.ArchiveException;
 import org.apache.commons.compress.archivers.ArchiveStreamFactory;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -24,8 +25,6 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
 import org.apache.commons.compress.utils.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -44,10 +43,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.zip.GZIPInputStream;
 
+@Slf4j
 public final class CompressionUtils {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(CompressionUtils.class);
-
     private CompressionUtils() {
     }
 
@@ -58,7 +56,7 @@ public final class CompressionUtils {
      * @param outputFile the output tarball file.
      */
     public static void tarGzip(final Path inputDir, final Path outputFile) throws IOException {
-        LOGGER.info("Tar directory '{}' to file '{}'.", inputDir, outputFile);
+        log.info("Tar directory '{}' to file '{}'.", inputDir, outputFile);
         try (OutputStream out = Files.newOutputStream(outputFile);
              BufferedOutputStream bufferedOut = new BufferedOutputStream(out);
              GzipCompressorOutputStream gzOut = new GzipCompressorOutputStream(bufferedOut);
@@ -78,9 +76,9 @@ public final class CompressionUtils {
                 }
             });
             tarOut.finish();
-            LOGGER.info("Creating tar file '{}'.", outputFile);
+            log.info("Creating tar file '{}'.", outputFile);
         } catch (IOException e) {
-            LOGGER.error("Error when tar directory '{}' to file '{}'.", inputDir, outputFile);
+            log.error("Error when tar directory '{}' to file '{}'.", inputDir, outputFile);
             throw e;
         }
     }
@@ -99,7 +97,7 @@ public final class CompressionUtils {
      */
     public static void unTar(final File inputFile, final File outputDir) throws IOException, ArchiveException {
 
-        LOGGER.info("Untaring {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
+        log.info("Untaring {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
 
         final List<File> untaredFiles = new LinkedList<>();
         try (final InputStream is = new FileInputStream(inputFile);
@@ -111,15 +109,15 @@ public final class CompressionUtils {
                     throw new IllegalStateException("Bad zip entry");
                 }
                 if (entry.isDirectory()) {
-                    LOGGER.info("Attempting to write output directory {}.", outputFile.getAbsolutePath());
+                    log.info("Attempting to write output directory {}.", outputFile.getAbsolutePath());
                     if (!outputFile.exists()) {
-                        LOGGER.info("Attempting to create output directory {}.", outputFile.getAbsolutePath());
+                        log.info("Attempting to create output directory {}.", outputFile.getAbsolutePath());
                         if (!outputFile.mkdirs()) {
                             throw new IllegalStateException(String.format("Couldn't create directory %s.", outputFile.getAbsolutePath()));
                         }
                     }
                 } else {
-                    LOGGER.info("Creating output file {}.", outputFile.getAbsolutePath());
+                    log.info("Creating output file {}.", outputFile.getAbsolutePath());
                     File outputParentFile = outputFile.getParentFile();
                     if (outputParentFile != null && !outputParentFile.exists()) {
                         outputParentFile.mkdirs();
@@ -147,7 +145,7 @@ public final class CompressionUtils {
      */
     public static File unGzip(final File inputFile, final File outputDir) throws IOException {
 
-        LOGGER.info("Unzipping {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
+        log.info("Unzipping {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
 
         final File outputFile = new File(outputDir, inputFile.getName().substring(0, inputFile.getName().length() - 3));
 
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/classloader/CustomClassLoader.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/classloader/CustomClassLoader.java
index 5777805a7..59b570283 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/classloader/CustomClassLoader.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/classloader/CustomClassLoader.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.core.sql.classloader;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -26,10 +25,9 @@ import java.net.URLClassLoader;
 import java.nio.file.Path;
 
 // TODO: maybe a unified plugin-style discovery mechanism is better.
+@Slf4j
 public class CustomClassLoader extends URLClassLoader {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(CustomClassLoader.class);
-
     public CustomClassLoader() {
         super(new URL[0]);
     }
@@ -45,7 +43,7 @@ public class CustomClassLoader extends URLClassLoader {
         try {
             this.addURL(jarPath.toUri().toURL());
         } catch (MalformedURLException e) {
-            LOGGER.error("Failed to add jar to classloader. Jar: {}", jarPath, e);
+            log.error("Failed to add jar to classloader. Jar: {}", jarPath, e);
         }
     }
 }
diff --git a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
index 044262ee8..7cc0db5a8 100644
--- a/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
+++ b/seatunnel-core/seatunnel-core-flink-sql/src/main/java/org/apache/seatunnel/core/sql/job/Executor.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.core.sql.classloader.CustomClassLoader;
 import org.apache.seatunnel.core.sql.splitter.SqlStatementSplitter;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -37,8 +38,6 @@ import org.apache.flink.table.factories.Factory;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.net.MalformedURLException;
@@ -52,14 +51,13 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class Executor {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(Executor.class);
-
     private static final String FLINK_SQL_SET_MATCHING_REGEX = "SET(\\s+(\\S+)\\s*=(.*))?";
     private static final int FLINK_SQL_SET_OPERANDS = 3;
 
-    private static CustomClassLoader CLASSLOADER = new CustomClassLoader();
+    private static final CustomClassLoader CLASSLOADER = new CustomClassLoader();
 
     private static final String CONNECTOR_IDENTIFIER = "connector";
     private static final String SQL_CONNECTOR_PREFIX = "flink-sql";
@@ -156,7 +154,7 @@ public class Executor {
             .collect(Collectors.toList());
 
         if (connectorFiles.size() > 1) {
-            LOGGER.warn("Found more than one connector jars for {}. Only the first one will be loaded.", connectorType);
+            log.warn("Found more than one connector jars for {}. Only the first one will be loaded.", connectorType);
         }
 
         File connectorFile = connectorFiles.size() >= 1 ? connectorFiles.get(0) : null;
@@ -179,7 +177,7 @@ public class Executor {
                 configuration.set(PipelineOptions.JARS, jars);
                 configuration.set(PipelineOptions.CLASSPATHS, classpath);
             } catch (MalformedURLException ignored) {
-                LOGGER.error("Failed to load connector {}. Connector file: {}", connectorType, connectorFile.getAbsolutePath());
+                log.error("Failed to load connector {}. Connector file: {}", connectorType, connectorFile.getAbsolutePath());
             }
         }
     }
@@ -199,7 +197,7 @@ public class Executor {
         return Optional.empty();
     }
 
-    private static Optional<Pair<String, String>> operandConverter(String[] operands){
+    private static Optional<Pair<String, String>> operandConverter(String[] operands) {
         if (operands.length != FLINK_SQL_SET_OPERANDS) {
             return Optional.empty();
         }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
index f03522a04..253b69911 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
@@ -24,18 +24,16 @@ import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.config.FlinkApiConfigChecker;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.nio.file.Path;
 
 /**
  * Used to check the Flink conf is validated.
  */
+@Slf4j
 public class FlinkApiConfValidateCommand implements Command<FlinkCommandArgs> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkApiConfValidateCommand.class);
-
     private final FlinkCommandArgs flinkCommandArgs;
 
     public FlinkApiConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
@@ -47,6 +45,6 @@ public class FlinkApiConfValidateCommand implements Command<FlinkCommandArgs> {
         Path configPath = FileUtils.getConfigPath(flinkCommandArgs);
         ConfigBuilder configBuilder = new ConfigBuilder(configPath);
         new FlinkApiConfigChecker().checkConfig(configBuilder.getConfig());
-        LOGGER.info("config OK !");
+        log.info("config OK !");
     }
 }
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
index 0f4a93077..6aef6317d 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
@@ -24,18 +24,16 @@ import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.core.spark.config.SparkApiConfigChecker;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.nio.file.Path;
 
 /**
  * Used to validate the spark task conf is validated.
  */
+@Slf4j
 public class SparkConfValidateCommand implements Command<SparkCommandArgs> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SparkConfValidateCommand.class);
-
     private final SparkCommandArgs sparkCommandArgs;
 
     public SparkConfValidateCommand(SparkCommandArgs sparkCommandArgs) {
@@ -47,6 +45,6 @@ public class SparkConfValidateCommand implements Command<SparkCommandArgs> {
         Path confPath = FileUtils.getConfigPath(sparkCommandArgs);
         ConfigBuilder configBuilder = new ConfigBuilder(confPath);
         new SparkApiConfigChecker().checkConfig(configBuilder.getConfig());
-        LOGGER.info("config OK !");
+        log.info("config OK !");
     }
 }
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/Seatunnel.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/Seatunnel.java
index 330aadb85..e4de62add 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/Seatunnel.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/Seatunnel.java
@@ -22,18 +22,17 @@ import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.command.CommandArgs;
 import org.apache.seatunnel.core.starter.exception.CommandException;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+@Slf4j
 public class Seatunnel {
-    private static final Logger LOGGER = LoggerFactory.getLogger(Seatunnel.class);
 
     /**
      * This method is the entrypoint of SeaTunnel.
      *
      * @param command commandArgs
-     * @param <T>         commandType
+     * @param <T>     commandType
      */
     public static <T extends CommandArgs> void run(Command<T> command) throws CommandException {
         try {
@@ -48,26 +47,26 @@ public class Seatunnel {
     }
 
     private static void showConfigError(Throwable throwable) {
-        LOGGER.error(
+        log.error(
             "\n\n===============================================================================\n\n");
         String errorMsg = throwable.getMessage();
-        LOGGER.error("Config Error:\n");
-        LOGGER.error("Reason: {} \n", errorMsg);
-        LOGGER.error(
+        log.error("Config Error:\n");
+        log.error("Reason: {} \n", errorMsg);
+        log.error(
             "\n===============================================================================\n\n\n");
     }
 
     private static void showFatalError(Throwable throwable) {
-        LOGGER.error(
+        log.error(
             "\n\n===============================================================================\n\n");
         String errorMsg = throwable.getMessage();
-        LOGGER.error("Fatal Error, \n");
+        log.error("Fatal Error, \n");
         // FIX
-        LOGGER.error(
+        log.error(
             "Please submit bug report in https://github.com/apache/incubator-seatunnel/issues\n");
-        LOGGER.error("Reason:{} \n", errorMsg);
-        LOGGER.error("Exception StackTrace:{} ", ExceptionUtils.getStackTrace(throwable));
-        LOGGER.error(
+        log.error("Reason:{} \n", errorMsg);
+        log.error("Exception StackTrace:{} ", ExceptionUtils.getStackTrace(throwable));
+        log.error(
             "\n===============================================================================\n\n\n");
     }
 }
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigBuilder.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigBuilder.java
index 968aa60a4..e96074296 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigBuilder.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigBuilder.java
@@ -24,19 +24,16 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.nio.file.Path;
 
 /**
  * Used to build the {@link  Config} from file.
- *
  */
+@Slf4j
 public class ConfigBuilder {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigBuilder.class);
-
     private static final String PLUGIN_NAME_KEY = "plugin_name";
     private final Path configFile;
     private final Config config;
@@ -52,7 +49,7 @@ public class ConfigBuilder {
             throw new ConfigRuntimeException("Please specify config file");
         }
 
-        LOGGER.info("Loading config file: {}", configFile);
+        log.info("Loading config file: {}", configFile);
 
         // variables substitution / variables resolution order:
         // config file --> system environment --> java properties
@@ -63,7 +60,7 @@ public class ConfigBuilder {
                 ConfigResolveOptions.defaults().setAllowUnresolved(true));
 
         ConfigRenderOptions options = ConfigRenderOptions.concise().setFormatted(true);
-        LOGGER.info("parsed config file: {}", config.root().render(options));
+        log.info("parsed config file: {}", config.root().render(options));
         return config;
     }
 
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
index b0c47d2ac..24a029860 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EngineType.java
@@ -20,8 +20,7 @@ package org.apache.seatunnel.core.starter.config;
 public enum EngineType {
     SPARK("spark"),
     FLINK("flink"),
-    SEATUNNEL("seatunnel"),
-    ;
+    SEATUNNEL("seatunnel");
 
     private final String engine;
 
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/AsciiArtUtils.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/AsciiArtUtils.java
index fa572f23e..3c38b74d4 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/AsciiArtUtils.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/AsciiArtUtils.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.core.starter.utils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.awt.Font;
 import java.awt.Graphics;
@@ -26,10 +25,9 @@ import java.awt.Graphics2D;
 import java.awt.RenderingHints;
 import java.awt.image.BufferedImage;
 
+@Slf4j
 public final class AsciiArtUtils {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(AsciiArtUtils.class);
-
     private static final int FONT_SIZE = 24;
     private static final int DRAW_X = 6;
     private static final int RGB = -16777216;
@@ -66,7 +64,7 @@ public final class AsciiArtUtils {
             if (sb.toString().trim().isEmpty()) {
                 continue;
             }
-            LOGGER.info(String.valueOf(sb));
+            log.info(String.valueOf(sb));
         }
     }
 }
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CompressionUtils.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CompressionUtils.java
index 0554e7091..66efe2923 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CompressionUtils.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/CompressionUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.utils;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.compress.archivers.ArchiveException;
 import org.apache.commons.compress.archivers.ArchiveStreamFactory;
 import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -24,8 +25,6 @@ import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
 import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
 import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
 import org.apache.commons.compress.utils.IOUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -44,10 +43,9 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.zip.GZIPInputStream;
 
+@Slf4j
 public final class CompressionUtils {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(CompressionUtils.class);
-
     private CompressionUtils() {
     }
 
@@ -58,7 +56,7 @@ public final class CompressionUtils {
      * @param outputFile the output tarball file.
      */
     public static void tarGzip(final Path inputDir, final Path outputFile) throws IOException {
-        LOGGER.info("Tar directory '{}' to file '{}'.", inputDir, outputFile);
+        log.info("Tar directory '{}' to file '{}'.", inputDir, outputFile);
         try (OutputStream out = Files.newOutputStream(outputFile);
              BufferedOutputStream bufferedOut = new BufferedOutputStream(out);
              GzipCompressorOutputStream gzOut = new GzipCompressorOutputStream(bufferedOut);
@@ -78,9 +76,9 @@ public final class CompressionUtils {
                 }
             });
             tarOut.finish();
-            LOGGER.info("Creating tar file '{}'.", outputFile);
+            log.info("Creating tar file '{}'.", outputFile);
         } catch (IOException e) {
-            LOGGER.error("Error when tar directory '{}' to file '{}'.", inputDir, outputFile);
+            log.error("Error when tar directory '{}' to file '{}'.", inputDir, outputFile);
             throw e;
         }
     }
@@ -99,7 +97,7 @@ public final class CompressionUtils {
      */
     public static void unTar(final File inputFile, final File outputDir) throws IOException, ArchiveException {
 
-        LOGGER.info("Untaring {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
+        log.info("Untaring {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
 
         final List<File> untaredFiles = new LinkedList<>();
         try (final InputStream is = new FileInputStream(inputFile);
@@ -111,15 +109,15 @@ public final class CompressionUtils {
                     throw new IllegalStateException("Bad zip entry");
                 }
                 if (entry.isDirectory()) {
-                    LOGGER.info("Attempting to write output directory {}.", outputFile.getAbsolutePath());
+                    log.info("Attempting to write output directory {}.", outputFile.getAbsolutePath());
                     if (!outputFile.exists()) {
-                        LOGGER.info("Attempting to create output directory {}.", outputFile.getAbsolutePath());
+                        log.info("Attempting to create output directory {}.", outputFile.getAbsolutePath());
                         if (!outputFile.mkdirs()) {
                             throw new IllegalStateException(String.format("Couldn't create directory %s.", outputFile.getAbsolutePath()));
                         }
                     }
                 } else {
-                    LOGGER.info("Creating output file {}.", outputFile.getAbsolutePath());
+                    log.info("Creating output file {}.", outputFile.getAbsolutePath());
                     final OutputStream outputFileStream = new FileOutputStream(outputFile);
                     IOUtils.copy(debInputStream, outputFileStream);
                     outputFileStream.close();
@@ -143,7 +141,7 @@ public final class CompressionUtils {
      */
     public static File unGzip(final File inputFile, final File outputDir) throws IOException {
 
-        LOGGER.info("Unzipping {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
+        log.info("Unzipping {} to dir {}.", inputFile.getAbsolutePath(), outputDir.getAbsolutePath());
 
         final File outputFile = new File(outputDir, inputFile.getName().substring(0, inputFile.getName().length() - 3));
 
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiConfValidateCommand.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiConfValidateCommand.java
index 5ca54a392..daedfaf34 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiConfValidateCommand.java
@@ -24,18 +24,16 @@ import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.starter.flink.config.FlinkApiConfigChecker;
 import org.apache.seatunnel.core.starter.utils.FileUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.nio.file.Path;
 
 /**
  * Use to validate the configuration of the SeaTunnel API.
  */
+@Slf4j
 public class FlinkApiConfValidateCommand implements Command<FlinkCommandArgs> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkApiConfValidateCommand.class);
-
     private final FlinkCommandArgs flinkCommandArgs;
 
     public FlinkApiConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
@@ -48,6 +46,6 @@ public class FlinkApiConfValidateCommand implements Command<FlinkCommandArgs> {
         // todo: validate the config by new api
         ConfigBuilder configBuilder = new ConfigBuilder(configPath);
         new FlinkApiConfigChecker().checkConfig(configBuilder.getConfig());
-        LOGGER.info("config OK !");
+        log.info("config OK !");
     }
 }
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java
index 9fbbca174..19395db1a 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java
@@ -26,8 +26,7 @@ import org.apache.seatunnel.core.starter.utils.FileUtils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.nio.file.Path;
 
@@ -35,10 +34,9 @@ import java.nio.file.Path;
  * todo: do we need to move these class to a new module? since this may cause version conflict with the old flink version.
  * This command is used to execute the Flink job by SeaTunnel new API.
  */
+@Slf4j
 public class FlinkApiTaskExecuteCommand implements Command<FlinkCommandArgs> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkApiTaskExecuteCommand.class);
-
     private final FlinkCommandArgs flinkCommandArgs;
 
     public FlinkApiTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index 51b36e7d2..4407a3e13 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -28,10 +28,9 @@ import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -43,9 +42,10 @@ import java.util.stream.Collectors;
 /**
  * Used to execute a SeaTunnelTask.
  */
+
+@Slf4j
 public class FlinkExecution implements TaskExecution {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkExecution.class);
     private final FlinkEnvironment flinkEnvironment;
     private final PluginExecuteProcessor sourcePluginExecuteProcessor;
     private final PluginExecuteProcessor transformPluginExecuteProcessor;
@@ -68,7 +68,7 @@ public class FlinkExecution implements TaskExecution {
         dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
         sinkPluginExecuteProcessor.execute(dataStreams);
 
-        LOGGER.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
+        log.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
         try {
             flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
         } catch (Exception e) {
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiConfValidateCommand.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiConfValidateCommand.java
index 7b9d6782a..213a8d900 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiConfValidateCommand.java
@@ -24,17 +24,16 @@ import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.core.starter.spark.config.SparkApiConfigChecker;
 import org.apache.seatunnel.core.starter.utils.FileUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.nio.file.Path;
 
 /**
  * Use to validate the configuration of the SeaTunnel API.
  */
-public class SparkApiConfValidateCommand implements Command<SparkCommandArgs> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SparkApiConfValidateCommand.class);
+@Slf4j
+public class SparkApiConfValidateCommand implements Command<SparkCommandArgs> {
 
     private final SparkCommandArgs sparkCommandArgs;
 
@@ -48,6 +47,6 @@ public class SparkApiConfValidateCommand implements Command<SparkCommandArgs> {
         // todo: validate the config by new api
         ConfigBuilder configBuilder = new ConfigBuilder(configPath);
         new SparkApiConfigChecker().checkConfig(configBuilder.getConfig());
-        LOGGER.info("config OK !");
+        log.info("config OK !");
     }
 }
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
index 2eb472937..5fdc4e28d 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
@@ -26,8 +26,7 @@ import org.apache.seatunnel.core.starter.utils.FileUtils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.nio.file.Path;
 
@@ -35,10 +34,9 @@ import java.nio.file.Path;
  * todo: do we need to move these class to a new module? since this may cause version conflict with the old Spark version.
  * This command is used to execute the Spark job by SeaTunnel new API.
  */
+@Slf4j
 public class SparkApiTaskExecuteCommand implements Command<SparkCommandArgs> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SparkApiTaskExecuteCommand.class);
-
     private final SparkCommandArgs sparkCommandArgs;
 
     public SparkApiTaskExecuteCommand(SparkCommandArgs sparkCommandArgs) {
@@ -53,7 +51,7 @@ public class SparkApiTaskExecuteCommand implements Command<SparkCommandArgs> {
             SparkExecution seaTunnelTaskExecution = new SparkExecution(config);
             seaTunnelTaskExecution.execute();
         } catch (Exception e) {
-            LOGGER.error("Run SeaTunnel on spark failed.", e);
+            log.error("Run SeaTunnel on spark failed.", e);
             throw new CommandExecuteException(e.getMessage());
         }
     }
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index 62890277c..ed2e60e91 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -25,17 +25,16 @@ import org.apache.seatunnel.spark.SparkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
 
+@Slf4j
 public class SparkExecution {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(SparkExecution.class);
     private final SparkEnvironment sparkEnvironment;
     private final PluginExecuteProcessor sourcePluginExecuteProcessor;
     private final PluginExecuteProcessor transformPluginExecuteProcessor;
@@ -56,6 +55,6 @@ public class SparkExecution {
         datasets = transformPluginExecuteProcessor.execute(datasets);
         sinkPluginExecuteProcessor.execute(datasets);
 
-        LOGGER.info("Spark Execution started");
+        log.info("Spark Execution started");
     }
 }
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ApiConfValidateCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ApiConfValidateCommand.java
index 20bc93659..0a05d7c4d 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ApiConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ApiConfValidateCommand.java
@@ -24,18 +24,16 @@ import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
 import org.apache.seatunnel.core.starter.seatunnel.config.SeaTunnelApiConfigChecker;
 import org.apache.seatunnel.core.starter.utils.FileUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import lombok.extern.slf4j.Slf4j;
 
 import java.nio.file.Path;
 
 /**
  * Use to validate the configuration of the SeaTunnel API.
  */
+@Slf4j
 public class ApiConfValidateCommand implements Command<ClientCommandArgs> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ApiConfValidateCommand.class);
-
     private final ClientCommandArgs clientCommandArgs;
 
     public ApiConfValidateCommand(ClientCommandArgs clientCommandArgs) {
@@ -47,6 +45,6 @@ public class ApiConfValidateCommand implements Command<ClientCommandArgs> {
         Path configPath = FileUtils.getConfigPath(clientCommandArgs);
         ConfigBuilder configBuilder = new ConfigBuilder(configPath);
         new SeaTunnelApiConfigChecker().checkConfig(configBuilder.getConfig());
-        LOGGER.info("config OK !");
+        log.info("config OK !");
     }
 }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
index cf412da33..60746e25b 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/xa/XaGroupOpsImplIT.java
@@ -26,13 +26,12 @@ import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.DataSourceUtils;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.options.JdbcConnectionOptions;
 
+import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.lifecycle.Startables;
@@ -45,10 +44,10 @@ import javax.transaction.xa.Xid;
 
 import java.util.stream.Stream;
 
+@Slf4j
 @Disabled("Temporary fast fix, reason: JdbcDatabaseContainer: ClassNotFoundException: com.mysql.jdbc.Driver")
 class XaGroupOpsImplIT {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(XaGroupOpsImplIT.class);
     private MySQLContainer<?> mc;
     private XaGroupOps xaGroupOps;
     private SemanticXidGenerator xidGenerator;
@@ -61,7 +60,7 @@ class XaGroupOpsImplIT {
         // Non-root users need to grant XA_RECOVER_ADMIN permission
         mc = new MySQLContainer<>(DockerImageName.parse("mysql:8.0.29"))
             .withUsername("root")
-            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+            .withLogConsumer(new Slf4jLogConsumer(log));
         Startables.deepStart(Stream.of(mc)).join();
 
         jdbcConnectionOptions = JdbcConnectionOptions.builder()
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index fcd2815b2..03d2cca0c 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -32,21 +32,19 @@ import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
 import com.google.common.collect.Lists;
 import com.hazelcast.client.config.ClientConfig;
 import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import lombok.extern.slf4j.Slf4j;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+@Slf4j
 @Disabled("Disabled because connector-v2 jar dist not exist")
 public class JobExecutionIT {
-    private static final Logger LOGGER = LoggerFactory.getLogger(JobExecutionIT.class);
-
     @BeforeAll
     public static void beforeClass() throws Exception {
         SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
index 792496a4e..12f4b2a06 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/FakeSourceToJdbcIT.java
@@ -22,12 +22,11 @@ import static org.awaitility.Awaitility.given;
 import org.apache.seatunnel.e2e.flink.FlinkContainer;
 
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.PostgreSQLContainer;
@@ -45,8 +44,8 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
+@Slf4j
 public class FakeSourceToJdbcIT extends FlinkContainer {
-    private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
     private PostgreSQLContainer<?> psl;
     private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
 
@@ -56,9 +55,9 @@ public class FakeSourceToJdbcIT extends FlinkContainer {
         psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
             .withNetwork(NETWORK)
             .withNetworkAliases("postgresql")
-            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+            .withLogConsumer(new Slf4jLogConsumer(log));
         Startables.deepStart(Stream.of(psl)).join();
-        LOGGER.info("PostgreSql container started");
+        log.info("PostgreSql container started");
         Class.forName(psl.getDriverClassName());
         given().ignoreExceptions()
             .await()
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
index 351f0e93e..2d0a75e1e 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcMysqlIT.java
@@ -25,12 +25,11 @@ import org.apache.seatunnel.e2e.flink.FlinkContainer;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.MySQLContainer;
@@ -57,8 +56,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+@Slf4j
 public class JdbcMysqlIT extends FlinkContainer {
-    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcMysqlIT.class);
     private MySQLContainer<?> mc;
     private Config config;
     private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
@@ -71,9 +70,9 @@ public class JdbcMysqlIT extends FlinkContainer {
             .withNetwork(NETWORK)
             .withNetworkAliases("mysql")
             .withUsername("root")
-            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+            .withLogConsumer(new Slf4jLogConsumer(log));
         Startables.deepStart(Stream.of(mc)).join();
-        LOGGER.info("Mysql container started");
+        log.info("Mysql container started");
         Class.forName(mc.getDriverClassName());
         given().ignoreExceptions()
             .await()
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
index 8fa6e6e81..f946b4ee9 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcPostgresIT.java
@@ -26,8 +26,6 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.PostgreSQLContainer;
@@ -52,7 +50,6 @@ import java.util.stream.Stream;
 
 @Slf4j
 public class JdbcPostgresIT extends FlinkContainer {
-    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcPostgresIT.class);
     private PostgreSQLContainer<?> pg;
     private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
 
@@ -64,9 +61,9 @@ public class JdbcPostgresIT extends FlinkContainer {
             .withNetworkAliases("postgresql")
             .withCommand("postgres -c max_prepared_transactions=100")
             .withUsername("root")
-            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+            .withLogConsumer(new Slf4jLogConsumer(log));
         Startables.deepStart(Stream.of(pg)).join();
-        LOGGER.info("Postgres container started");
+        log.info("Postgres container started");
         Class.forName(pg.getDriverClassName());
         given().ignoreExceptions()
             .await()
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
index 3a42c1d3a..64e09c85d 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSourceToConsoleIT.java
@@ -21,12 +21,11 @@ import static org.awaitility.Awaitility.given;
 
 import org.apache.seatunnel.e2e.flink.FlinkContainer;
 
+import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.PostgreSQLContainer;
@@ -43,8 +42,8 @@ import java.sql.Statement;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
+@Slf4j
 public class JdbcSourceToConsoleIT extends FlinkContainer {
-    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
     private PostgreSQLContainer<?> psl;
     private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
 
@@ -54,9 +53,9 @@ public class JdbcSourceToConsoleIT extends FlinkContainer {
         psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
             .withNetwork(NETWORK)
             .withNetworkAliases("postgresql")
-            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+            .withLogConsumer(new Slf4jLogConsumer(log));
         Startables.deepStart(Stream.of(psl)).join();
-        LOGGER.info("PostgreSql container started");
+        log.info("PostgreSql container started");
         Class.forName(psl.getDriverClassName());
         given().ignoreExceptions()
             .await()
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
index 4790c2bb7..fbd87ee9c 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
+++ b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-clickhouse-e2e/src/test/java/org/apache/seatunnel/e2e/flink/clickhouse/FakeSourceToClickhouseIT.java
@@ -22,12 +22,11 @@ import static org.awaitility.Awaitility.given;
 import org.apache.seatunnel.e2e.flink.FlinkContainer;
 
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -44,23 +43,22 @@ import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
+@Slf4j
 public class FakeSourceToClickhouseIT extends FlinkContainer {
 
     private GenericContainer<?> clickhouseServer;
     private BalancedClickhouseDataSource dataSource;
     private static final String CLICKHOUSE_DOCKER_IMAGE = "yandex/clickhouse-server:22.1.3.7";
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToClickhouseIT.class);
-
     @BeforeEach
     public void startClickhouseContainer() throws InterruptedException {
         clickhouseServer = new GenericContainer<>(CLICKHOUSE_DOCKER_IMAGE)
             .withNetwork(NETWORK)
             .withNetworkAliases("clickhouse")
-            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+            .withLogConsumer(new Slf4jLogConsumer(log));
         clickhouseServer.setPortBindings(Lists.newArrayList("8123:8123"));
         Startables.deepStart(Stream.of(clickhouseServer)).join();
-        LOGGER.info("Clickhouse container started");
+        log.info("Clickhouse container started");
         // wait for clickhouse fully start
         dataSource = createDatasource();
         given().ignoreExceptions()
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-file-e2e/src/test/java/org/apache/seatunnel/e2e/flink/file/FakeSourceToFileIT.java b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-file-e2e/src/test/java/org/apache/seatunnel/e2e/flink/file/FakeSourceToFileIT.java
index d861e8b4d..68a38aa4b 100644
--- a/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-file-e2e/src/test/java/org/apache/seatunnel/e2e/flink/file/FakeSourceToFileIT.java
+++ b/seatunnel-e2e/seatunnel-flink-e2e/seatunnel-connector-flink-file-e2e/src/test/java/org/apache/seatunnel/e2e/flink/file/FakeSourceToFileIT.java
@@ -19,16 +19,14 @@ package org.apache.seatunnel.e2e.flink.file;
 
 import org.apache.seatunnel.e2e.flink.FlinkContainer;
 
+import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 
+@Slf4j
 public class FakeSourceToFileIT extends FlinkContainer {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToFileIT.class);
-
     @Test
     public void testFakeSource2FileSink() throws Exception {
         Container.ExecResult execResult = executeSeaTunnelFlinkJob("/file/fakesource_to_file.conf");
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
index 217731032..08eb08535 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcMysqlIT.java
@@ -25,12 +25,11 @@ import org.apache.seatunnel.e2e.spark.SparkContainer;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.MySQLContainer;
@@ -57,8 +56,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+@Slf4j
 public class JdbcMysqlIT extends SparkContainer {
-    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcMysqlIT.class);
     private MySQLContainer<?> mc;
     private Config config;
     private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar";
@@ -71,9 +70,9 @@ public class JdbcMysqlIT extends SparkContainer {
             .withNetwork(NETWORK)
             .withNetworkAliases("mysql")
             .withUsername("root")
-            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+            .withLogConsumer(new Slf4jLogConsumer(log));
         Startables.deepStart(Stream.of(mc)).join();
-        LOGGER.info("Mysql container started");
+        log.info("Mysql container started");
         Class.forName(mc.getDriverClassName());
         given().ignoreExceptions()
             .await()
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
index bbc2bae7f..03770e8a5 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcPostgresIT.java
@@ -21,12 +21,11 @@ import static org.awaitility.Awaitility.given;
 
 import org.apache.seatunnel.e2e.spark.SparkContainer;
 
+import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.PostgreSQLContainer;
@@ -49,8 +48,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+@Slf4j
 public class JdbcPostgresIT extends SparkContainer {
-    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcPostgresIT.class);
     private PostgreSQLContainer<?> pg;
     private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/org/postgresql/postgresql/42.3.3/postgresql-42.3.3.jar";
 
@@ -61,9 +60,9 @@ public class JdbcPostgresIT extends SparkContainer {
             .withNetwork(NETWORK)
             .withNetworkAliases("postgresql")
             .withCommand("postgres -c max_prepared_transactions=100")
-            .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+            .withLogConsumer(new Slf4jLogConsumer(log));
         Startables.deepStart(Stream.of(pg)).join();
-        LOGGER.info("Postgres container started");
+        log.info("Postgres container started");
         Class.forName(pg.getDriverClassName());
         given().ignoreExceptions()
             .await()
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
index c2d2a1426..9051c17e7 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/FakeSourceToJdbcIT.java
@@ -22,12 +22,11 @@ import static org.awaitility.Awaitility.given;
 import org.apache.seatunnel.e2e.spark.SparkContainer;
 
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.PostgreSQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -44,8 +43,8 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
+@Slf4j
 public class FakeSourceToJdbcIT extends SparkContainer {
-    private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceToJdbcIT.class);
     private PostgreSQLContainer<?> psl;
 
     @SuppressWarnings("checkstyle:MagicNumber")
@@ -54,9 +53,9 @@ public class FakeSourceToJdbcIT extends SparkContainer {
         psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
                 .withNetwork(NETWORK)
                 .withNetworkAliases("postgresql")
-                .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+                .withLogConsumer(new Slf4jLogConsumer(log));
         Startables.deepStart(Stream.of(psl)).join();
-        LOGGER.info("PostgreSql container started");
+        log.info("PostgreSql container started");
         Class.forName(psl.getDriverClassName());
         given().ignoreExceptions()
             .await()
diff --git a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
index cf9be5e9e..eb999dad8 100644
--- a/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
+++ b/seatunnel-e2e/seatunnel-spark-e2e/seatunnel-connector-spark-jdbc-e2e/src/test/java/org/apache/seatunnel/e2e/spark/jdbc/JdbcSourceToConsoleIT.java
@@ -22,12 +22,11 @@ import static org.awaitility.Awaitility.given;
 import org.apache.seatunnel.e2e.spark.SparkContainer;
 
 import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.PostgreSQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -43,8 +42,8 @@ import java.sql.Statement;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 
+@Slf4j
 public class JdbcSourceToConsoleIT extends SparkContainer {
-    private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSourceToConsoleIT.class);
     private PostgreSQLContainer<?> psl;
 
     @SuppressWarnings("checkstyle:MagicNumber")
@@ -53,10 +52,10 @@ public class JdbcSourceToConsoleIT extends SparkContainer {
         psl = new PostgreSQLContainer<>(DockerImageName.parse("postgres:alpine3.16"))
                 .withNetwork(NETWORK)
                 .withNetworkAliases("postgresql")
-                .withLogConsumer(new Slf4jLogConsumer(LOGGER));
+                .withLogConsumer(new Slf4jLogConsumer(log));
         psl.setPortBindings(Lists.newArrayList("33306:3306"));
         Startables.deepStart(Stream.of(psl)).join();
-        LOGGER.info("PostgreSql container started");
+        log.info("PostgreSql container started");
         Class.forName(psl.getDriverClassName());
         given().ignoreExceptions()
             .await()
diff --git a/seatunnel-engine/seatunnel-engine-common/pom.xml b/seatunnel-engine/seatunnel-engine-common/pom.xml
index e45e0b6cc..a4d243670 100644
--- a/seatunnel-engine/seatunnel-engine-common/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-common/pom.xml
@@ -40,7 +40,7 @@
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-api</artifactId>
-            <version>${version}</version>
+            <version>${project.version}</version>
         </dependency>
     </dependencies>
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
index 8fffecddd..c75e45dad 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/AbstractPluginDiscovery.java
@@ -25,10 +25,9 @@ import org.apache.seatunnel.common.utils.ReflectionUtils;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
@@ -46,9 +45,9 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
+@Slf4j
 public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractPluginDiscovery.class);
     private final Path pluginDir;
 
     /**
@@ -69,12 +68,12 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
     public AbstractPluginDiscovery(String pluginSubDir, BiConsumer<ClassLoader, URL> addURLToClassloader) {
         this.pluginDir = Common.connectorJarDir(pluginSubDir);
         this.addURLToClassLoader = addURLToClassloader;
-        LOGGER.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir);
+        log.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir);
     }
 
     public AbstractPluginDiscovery(String pluginSubDir) {
         this.pluginDir = Common.connectorJarDir(pluginSubDir);
-        LOGGER.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir);
+        log.info("Load {} Plugin from {}", getPluginBaseClass().getSimpleName(), pluginDir);
     }
 
     @Override
@@ -98,7 +97,7 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
         T pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
         if (pluginInstance != null) {
-            LOGGER.info("Load plugin: {} from classpath", pluginIdentifier);
+            log.info("Load plugin: {} from classpath", pluginIdentifier);
             return pluginInstance;
         }
         Optional<URL> pluginJarPath = getPluginJarPath(pluginIdentifier);
@@ -108,13 +107,13 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
                 // use current thread classloader to avoid different classloader load same class error.
                 this.addURLToClassLoader.accept(classLoader, pluginJarPath.get());
             } catch (Exception e) {
-                LOGGER.warn("can't load jar use current thread classloader, use URLClassLoader instead now." +
+                log.warn("can't load jar use current thread classloader, use URLClassLoader instead now." +
                     " message: " + e.getMessage());
                 classLoader = new URLClassLoader(new URL[]{pluginJarPath.get()}, Thread.currentThread().getContextClassLoader());
             }
             pluginInstance = loadPluginInstance(pluginIdentifier, classLoader);
             if (pluginInstance != null) {
-                LOGGER.info("Load plugin: {} from path: {} use classloader: {}",
+                log.info("Load plugin: {} from path: {} use classloader: {}",
                     pluginIdentifier, pluginJarPath.get(), classLoader.getClass().getName());
                 return pluginInstance;
             }
@@ -201,10 +200,10 @@ public abstract class AbstractPluginDiscovery<T> implements PluginDiscovery<T> {
         }
         try {
             URL pluginJarPath = targetPluginFiles[0].toURI().toURL();
-            LOGGER.info("Discovery plugin jar: {} at: {}", pluginIdentifier.getPluginName(), pluginJarPath);
+            log.info("Discovery plugin jar: {} at: {}", pluginIdentifier.getPluginName(), pluginJarPath);
             return Optional.of(pluginJarPath);
         } catch (MalformedURLException e) {
-            LOGGER.warn("Cannot get plugin URL: " + targetPluginFiles[0], e);
+            log.warn("Cannot get plugin URL: " + targetPluginFiles[0], e);
             return Optional.empty();
         }
     }
diff --git a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/java/org/apache/seatunnel/flink/transform/UDF.java b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/java/org/apache/seatunnel/flink/transform/UDF.java
index 7c5d7f2e3..c55f9b8da 100644
--- a/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/java/org/apache/seatunnel/flink/transform/UDF.java
+++ b/seatunnel-transforms/seatunnel-transforms-flink/seatunnel-transform-flink-udf/src/main/java/org/apache/seatunnel/flink/transform/UDF.java
@@ -26,13 +26,12 @@ import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.functions.UserDefinedFunction;
 import org.apache.flink.types.Row;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -40,9 +39,9 @@ import java.util.Map;
 import java.util.Properties;
 
 @SuppressWarnings("PMD")
+@Slf4j
 public class UDF implements FlinkStreamTransform, FlinkBatchTransform {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(UDF.class);
     private static final String UDF_CONFIG_PREFIX = "function.";
 
     private Config config;
@@ -68,7 +67,7 @@ public class UDF implements FlinkStreamTransform, FlinkBatchTransform {
             try {
                 tEnv.createTemporarySystemFunction(functionNames.get(i), (Class<? extends UserDefinedFunction>) Class.forName(classNames.get(i)));
             } catch (ClassNotFoundException e) {
-                LOGGER.error("The udf class {} not founded, make sure you enter the correct class name", classNames.get(i));
+                log.error("The udf class {} not founded, make sure you enter the correct class name", classNames.get(i));
                 throw new RuntimeException(e);
             }
         }
@@ -109,9 +108,9 @@ public class UDF implements FlinkStreamTransform, FlinkBatchTransform {
         return "udf";
     }
 
-    private void hasSubConfig(String prefix){
+    private void hasSubConfig(String prefix) {
         for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
-            if (entry.getKey().startsWith(prefix)){
+            if (entry.getKey().startsWith(prefix)) {
                 return;
             }
         }
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
index 4cdfeb301..2f47df964 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkCommitter.java
@@ -19,19 +19,17 @@ package org.apache.seatunnel.translation.flink.sink;
 
 import org.apache.seatunnel.api.sink.SinkCommitter;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.connector.sink.Committer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class FlinkCommitter<CommT> implements Committer<CommitWrapper<CommT>> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkCommitter.class);
-
     private final SinkCommitter<CommT> sinkCommitter;
 
     FlinkCommitter(SinkCommitter<CommT> sinkCommitter) {
@@ -44,7 +42,7 @@ public class FlinkCommitter<CommT> implements Committer<CommitWrapper<CommT>> {
             .map(CommitWrapper::getCommit)
             .collect(Collectors.toList()));
         if (reCommittable != null && !reCommittable.isEmpty()) {
-            LOGGER.warn("this version not support re-commit when use flink engine");
+            log.warn("this version not support re-commit when use flink engine");
         }
         // TODO re-commit the data
         return new ArrayList<>();
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
index f71634351..d9a060d7a 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkGlobalCommitter.java
@@ -19,9 +19,8 @@ package org.apache.seatunnel.translation.flink.sink;
 
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -29,10 +28,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+@Slf4j
 public class FlinkGlobalCommitter<CommT, GlobalCommT> implements GlobalCommitter<CommitWrapper<CommT>, GlobalCommT> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkGlobalCommitter.class);
-
     private final SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter;
 
     FlinkGlobalCommitter(SinkAggregatedCommitter<CommT, GlobalCommT> aggregatedCommitter) {
@@ -55,7 +53,7 @@ public class FlinkGlobalCommitter<CommT, GlobalCommT> implements GlobalCommitter
     public List<GlobalCommT> commit(List<GlobalCommT> globalCommittables) throws IOException {
         List<GlobalCommT> reCommittable = aggregatedCommitter.commit(globalCommittables);
         if (reCommittable != null && !reCommittable.isEmpty()) {
-            LOGGER.warn("this version not support re-commit when use flink engine");
+            log.warn("this version not support re-commit when use flink engine");
         }
         // TODO re-commit the data
         return new ArrayList<>();
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
index 92494d49e..81c491e74 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/source/batch/ParallelBatchPartitionReader.java
@@ -26,9 +26,8 @@ import org.apache.seatunnel.translation.source.ParallelSource;
 import org.apache.seatunnel.translation.spark.common.InternalRowCollector;
 import org.apache.seatunnel.translation.util.ThreadPoolExecutorFactory;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -36,10 +35,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
+@Slf4j
 public class ParallelBatchPartitionReader {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ParallelBatchPartitionReader.class);
-
     protected static final Integer INTERVAL = 100;
 
     protected final SeaTunnelSource<SeaTunnelRow, ?, ?> source;
@@ -97,7 +95,7 @@ public class ParallelBatchPartitionReader {
                 internalSource.run(new InternalRowCollector(handover, checkpointLock, source.getProducedType()));
             } catch (Exception e) {
                 handover.reportError(e);
-                LOGGER.error("BatchPartitionReader execute failed.", e);
+                log.error("BatchPartitionReader execute failed.", e);
                 running = false;
             }
         });